This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 2c0eb6a87 [#3136] fix(spark-connector): support replace column with
same column name for spark connector (#3461)
2c0eb6a87 is described below
commit 2c0eb6a873bf1bb65dad0234eb0b699ba892ebae
Author: FANNG <[email protected]>
AuthorDate: Mon Jul 29 09:58:03 2024 +0800
[#3136] fix(spark-connector): support replace column with same column name
for spark connector (#3461)
### What changes were proposed in this pull request?
To support replace column:
* support delete and add **`same column name`** in valid column change
logic in hive catalog
* remove get column position logic if not specifying column position in
Iceberg catalog
### Why are the changes needed?
Fix: #3136
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
add IT
---
.../catalog/hive/HiveCatalogOperations.java | 14 ++++---
.../iceberg/ops/IcebergTableOpsHelper.java | 44 +++++++++-------------
.../integration/test/CatalogIcebergBaseIT.java | 37 ++++++++++++++++++
gradle.properties | 2 +-
.../connector/integration/test/SparkCommonIT.java | 41 ++++++++++++++++++++
.../integration/test/hive/SparkHiveCatalogIT.java | 5 +++
.../test/iceberg/SparkIcebergCatalogIT.java | 5 +++
.../integration/test/util/SparkUtilIT.java | 3 ++
8 files changed, 118 insertions(+), 33 deletions(-)
diff --git
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
index a7f70b97e..afae1cbbd 100644
---
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
+++
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
@@ -674,11 +674,11 @@ public class HiveCatalogOperations implements
CatalogOperations, SupportsSchemas
.filter(c -> c instanceof TableChange.ColumnChange)
.forEach(
c -> {
- String fieldToAdd = String.join(".", ((TableChange.ColumnChange)
c).fieldName());
+ String fieldName = String.join(".", ((TableChange.ColumnChange)
c).fieldName());
Preconditions.checkArgument(
c instanceof TableChange.UpdateColumnComment
- || !partitionFields.contains(fieldToAdd),
- "Cannot alter partition column: " + fieldToAdd);
+ || !partitionFields.contains(fieldName),
+ "Cannot alter partition column: " + fieldName);
if (c instanceof TableChange.UpdateColumnPosition
&& afterPartitionColumn(
@@ -687,12 +687,16 @@ public class HiveCatalogOperations implements
CatalogOperations, SupportsSchemas
"Cannot alter column position to after partition column");
}
+ if (c instanceof TableChange.DeleteColumn) {
+ existingFields.remove(fieldName);
+ }
+
if (c instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) c;
- if (existingFields.contains(fieldToAdd)) {
+ if (existingFields.contains(fieldName)) {
throw new IllegalArgumentException(
- "Cannot add column with duplicate name: " + fieldToAdd);
+ "Cannot add column with duplicate name: " + fieldName);
}
if (addColumn.getPosition() == null) {
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
index 176e2cbbe..86da0eaea 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
@@ -133,9 +133,13 @@ public class IcebergTableOpsHelper {
}
private void doUpdateColumnPosition(
- UpdateSchema icebergUpdateSchema, UpdateColumnPosition
updateColumnPosition) {
- doMoveColumn(
- icebergUpdateSchema, updateColumnPosition.fieldName(),
updateColumnPosition.getPosition());
+ UpdateSchema icebergUpdateSchema,
+ UpdateColumnPosition updateColumnPosition,
+ Schema icebergTableSchema) {
+ StructType tableSchema = icebergTableSchema.asStruct();
+ ColumnPosition columnPosition =
+ getColumnPositionForIceberg(tableSchema,
updateColumnPosition.getPosition());
+ doMoveColumn(icebergUpdateSchema, updateColumnPosition.fieldName(),
columnPosition);
}
private void doUpdateColumnType(
@@ -156,7 +160,9 @@ public class IcebergTableOpsHelper {
icebergUpdateSchema.updateColumn(fieldName, (PrimitiveType) type);
}
- private ColumnPosition getAddColumnPosition(StructType parent,
ColumnPosition columnPosition) {
+ // Iceberg doesn't support LAST position, transform to FIRST or AFTER.
+ private ColumnPosition getColumnPositionForIceberg(
+ StructType parent, ColumnPosition columnPosition) {
if (!(columnPosition instanceof TableChange.Default)) {
return columnPosition;
}
@@ -171,25 +177,7 @@ public class IcebergTableOpsHelper {
return ColumnPosition.after(last.name());
}
- private void doAddColumn(
- UpdateSchema icebergUpdateSchema, AddColumn addColumn, Schema
icebergTableSchema) {
- String parentName = getParentName(addColumn.fieldName());
- StructType parentStruct;
- if (parentName != null) {
- org.apache.iceberg.types.Type parent =
icebergTableSchema.findType(parentName);
- Preconditions.checkArgument(
- parent != null, "Couldn't find parent field: " + parentName + " in
Iceberg table");
- Preconditions.checkArgument(
- parent instanceof StructType,
- "Couldn't add column to non-struct field, name:"
- + parentName
- + ", type:"
- + parent.getClass().getSimpleName());
- parentStruct = (StructType) parent;
- } else {
- parentStruct = icebergTableSchema.asStruct();
- }
-
+ private void doAddColumn(UpdateSchema icebergUpdateSchema, AddColumn
addColumn) {
if (addColumn.isAutoIncrement()) {
throw new IllegalArgumentException("Iceberg doesn't support auto
increment column");
}
@@ -210,8 +198,9 @@ public class IcebergTableOpsHelper {
addColumn.getComment());
}
- ColumnPosition position = getAddColumnPosition(parentStruct,
addColumn.getPosition());
- doMoveColumn(icebergUpdateSchema, addColumn.fieldName(), position);
+ if (!ColumnPosition.defaultPos().equals(addColumn.getPosition())) {
+ doMoveColumn(icebergUpdateSchema, addColumn.fieldName(),
addColumn.getPosition());
+ }
}
private void alterTableProperty(
@@ -237,11 +226,12 @@ public class IcebergTableOpsHelper {
Schema icebergTableSchema) {
for (ColumnChange change : columnChanges) {
if (change instanceof AddColumn) {
- doAddColumn(icebergUpdateSchema, (AddColumn) change,
icebergTableSchema);
+ doAddColumn(icebergUpdateSchema, (AddColumn) change);
} else if (change instanceof DeleteColumn) {
doDeleteColumn(icebergUpdateSchema, (DeleteColumn) change,
icebergTableSchema);
} else if (change instanceof UpdateColumnPosition) {
- doUpdateColumnPosition(icebergUpdateSchema, (UpdateColumnPosition)
change);
+ doUpdateColumnPosition(
+ icebergUpdateSchema, (UpdateColumnPosition) change,
icebergTableSchema);
} else if (change instanceof RenameColumn) {
doRenameColumn(icebergUpdateSchema, (RenameColumn) change);
} else if (change instanceof UpdateColumnType) {
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index 078fe803c..7fd503550 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -584,6 +584,43 @@ public abstract class CatalogIcebergBaseIT extends
AbstractIT {
Assertions.assertEquals(0, tableIdentifiers.size());
}
+ @Test
+ public void testUpdateIcebergColumnDefaultPosition() {
+ Column col1 = Column.of("name", Types.StringType.get(), "comment");
+ Column col2 = Column.of("address", Types.StringType.get(), "comment");
+ Column col3 = Column.of("date_of_birth", Types.StringType.get(),
"comment");
+ Column[] newColumns = new Column[] {col1, col2, col3};
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("CatalogIcebergIT_table"));
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableIdentifier,
+ newColumns,
+ table_comment,
+ ImmutableMap.of(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0]);
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ tableIdentifier,
+ TableChange.updateColumnPosition(
+ new String[] {col1.name()},
TableChange.ColumnPosition.defaultPos()));
+
+ Table updateColumnPositionTable =
catalog.asTableCatalog().loadTable(tableIdentifier);
+
+ Column[] updateCols = updateColumnPositionTable.columns();
+ Assertions.assertEquals(3, updateCols.length);
+ Assertions.assertEquals(col2.name(), updateCols[0].name());
+ Assertions.assertEquals(col3.name(), updateCols[1].name());
+ Assertions.assertEquals(col1.name(), updateCols[2].name());
+
+ catalog.asTableCatalog().dropTable(tableIdentifier);
+ }
+
@Test
public void testAlterIcebergTable() {
Column[] columns = createColumns();
diff --git a/gradle.properties b/gradle.properties
index b4deeef33..5b36e4623 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -40,4 +40,4 @@ defaultScalaVersion = 2.12
pythonVersion = 3.8
# skipDockerTests is used to skip the tests that require Docker to be running.
-skipDockerTests = true
+skipDockerTests = false
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index a67fcd218..63e4801ef 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -115,6 +115,8 @@ public abstract class SparkCommonIT extends SparkEnvIT {
protected abstract boolean supportsDelete();
+ protected abstract boolean supportsSchemaEvolution();
+
// Use a custom database not the original default database because
SparkCommonIT couldn't
// read&write data to tables in default database. The main reason is default
database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is
local HDFS address
@@ -547,6 +549,45 @@ public abstract class SparkCommonIT extends SparkEnvIT {
checkTableColumns(tableName, updateCommentColumns,
getTableInfo(tableName));
}
+ @Test
+ void testAlterTableReplaceColumns() {
+ String tableName = "test_replace_columns_table";
+ dropTableIfExists(tableName);
+
+ createSimpleTable(tableName);
+ List<SparkColumnInfo> simpleTableColumns = getSimpleTableColumn();
+ SparkTableInfo tableInfo = getTableInfo(tableName);
+ checkTableColumns(tableName, simpleTableColumns, tableInfo);
+ checkTableReadWrite(tableInfo);
+ String firstLine = getExpectedTableData(tableInfo);
+
+ sql(
+ String.format(
+ "ALTER TABLE %S REPLACE COLUMNS (id int COMMENT 'new comment',
name2 string, age long);",
+ tableName));
+ ArrayList<SparkColumnInfo> updateColumns = new ArrayList<>();
+ // change comment for id
+ updateColumns.add(SparkColumnInfo.of("id", DataTypes.IntegerType, "new
comment"));
+ // change column name
+ updateColumns.add(SparkColumnInfo.of("name2", DataTypes.StringType, null));
+ // change column type
+ updateColumns.add(SparkColumnInfo.of("age", DataTypes.LongType, null));
+
+ tableInfo = getTableInfo(tableName);
+ checkTableColumns(tableName, updateColumns, tableInfo);
+ sql(String.format("INSERT INTO %S VALUES(3, 'name2', 10)", tableName));
+ List<String> data = getQueryData(String.format("SELECT * from %s ORDER BY
id", tableName));
+ Assertions.assertEquals(2, data.size());
+ if (supportsSchemaEvolution()) {
+ // It's different columns for Iceberg if delete and add a column with
same name.
+ Assertions.assertEquals(
+ String.join(",", Arrays.asList(NULL_STRING, NULL_STRING,
NULL_STRING)), data.get(0));
+ } else {
+ Assertions.assertEquals(firstLine, data.get(0));
+ }
+ Assertions.assertEquals("3,name2,10", data.get(1));
+ }
+
@Test
void testComplexType() {
String tableName = "complex_type_table";
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index a4ce99811..5680e2e30 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -76,6 +76,11 @@ public abstract class SparkHiveCatalogIT extends
SparkCommonIT {
return false;
}
+ @Override
+ protected boolean supportsSchemaEvolution() {
+ return false;
+ }
+
@Test
void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index 4df4992b5..52f4abf3a 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -99,6 +99,11 @@ public abstract class SparkIcebergCatalogIT extends
SparkCommonIT {
return true;
}
+ @Override
+ protected boolean supportsSchemaEvolution() {
+ return true;
+ }
+
@Override
protected String getTableLocation(SparkTableInfo table) {
return String.join(File.separator, table.getTableLocation(), "data");
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index 3a25cb022..05637c0ce 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.api.Assertions;
* <p>Referred from
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
*/
public abstract class SparkUtilIT extends AbstractIT {
+ protected static final String NULL_STRING = "NULL";
protected abstract SparkSession getSparkSession();
@@ -103,6 +104,8 @@ public abstract class SparkUtilIT extends AbstractIT {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sdf.setTimeZone(TimeZone.getTimeZone(TIME_ZONE_UTC));
return sdf.format(timestamp);
+ } else if (item == null) {
+ return NULL_STRING;
} else {
return item.toString();
}