This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 66e95ce792 Evolve iceberg table schema for partition copy (#4142)
66e95ce792 is described below
commit 66e95ce792f8dc43a70715e1135e2ece52dc44df
Author: thisisArjit <[email protected]>
AuthorDate: Fri Oct 24 18:08:35 2025 +0530
Evolve iceberg table schema for partition copy (#4142)
---
.../iceberg/IcebergOverwritePartitionsStep.java | 56 +++++--
.../copy/iceberg/IcebergPartitionDataset.java | 13 +-
.../iceberg/IcebergPartitionDatasetFinder.java | 3 +
.../data/management/copy/iceberg/IcebergTable.java | 29 ++++
.../IcebergTableMetadataValidatorUtils.java | 28 +---
.../IcebergOverwritePartitionsStepTest.java | 7 +-
.../IcebergTableMetadataValidatorUtilsTest.java | 78 ----------
.../management/copy/iceberg/IcebergTableTest.java | 165 +++++++++++++++++++++
8 files changed, 258 insertions(+), 121 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index 30dc2094f3..e7c83ee2d2 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import com.github.rholder.retry.Attempt;
@@ -57,6 +58,14 @@ import static
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
@Slf4j
public class IcebergOverwritePartitionsStep implements CommitStep {
private final String destTableIdStr;
+ /**
+ * Updated schema must be enforced to be backwards compatible by the catalog
in
+ * the partition overwrite step otherwise previous partitions may become
unreadable.
+ * This schema is determined at the start of the copy job before data files
discovery and the same is
+ * committed before overwriting partitions. It is the responsibility of the
catalog to ensure that
+ * no conflicting commits happen in between the copy job.
+ */
+ private final Schema updatedSchema;
private final Properties properties;
// data files are populated once all the copy tasks are done. Each
IcebergPartitionCopyableFile has a serialized data file
@Setter private List<DataFile> dataFiles;
@@ -64,6 +73,8 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
private final String partitionValue;
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".catalog.overwrite.partitions.retries";
+ public static final String SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
+ ".schema.update.retries";
private static final Config RETRYER_FALLBACK_CONFIG =
ConfigFactory.parseMap(ImmutableMap.of(
RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
RETRY_TIMES, 3,
@@ -75,8 +86,9 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
* @param destTableIdStr the identifier of the destination table as a string
* @param properties the properties containing configuration
*/
- public IcebergOverwritePartitionsStep(String destTableIdStr, String
partitionColName, String partitionValue, Properties properties) {
+ public IcebergOverwritePartitionsStep(String destTableIdStr, Schema
updatedSchema, String partitionColName, String partitionValue, Properties
properties) {
this.destTableIdStr = destTableIdStr;
+ this.updatedSchema = updatedSchema;
this.partitionColName = partitionColName;
this.partitionValue = partitionValue;
this.properties = properties;
@@ -108,6 +120,13 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
dataFiles.size(),
dataFiles.get(0).path()
);
+ // update schema
+ Retryer<Void> schemaUpdateRetryer = createSchemaUpdateRetryer();
+ schemaUpdateRetryer.call(() -> {
+ destTable.updateSchema(this.updatedSchema, false);
+ return null;
+ });
+ // overwrite partitions
Retryer<Void> overwritePartitionsRetryer =
createOverwritePartitionsRetryer();
overwritePartitionsRetryer.call(() -> {
destTable.overwritePartition(dataFiles, this.partitionColName,
this.partitionValue);
@@ -146,17 +165,30 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
?
config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
: ConfigFactory.empty();
+ return getRetryer(retryerOverridesConfig);
+ }
+
+ private Retryer<Void> createSchemaUpdateRetryer() {
+ Config config = ConfigFactory.parseProperties(this.properties);
+ Config retryerOverridesConfig =
config.hasPath(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+ ?
config.getConfig(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+ : ConfigFactory.empty();
+
+ return getRetryer(retryerOverridesConfig);
+ }
+
+ private Retryer<Void> getRetryer(Config retryerOverridesConfig) {
return
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
Optional.of(new RetryListener() {
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- if (attempt.hasException()) {
- String msg = String.format("~%s~ Exception while overwriting
partitions [attempt: %d; elapsed: %s]",
- destTableIdStr,
- attempt.getAttemptNumber(),
-
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
- log.warn(msg, attempt.getExceptionCause());
- }
- }
- }));
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ String msg = String.format("~%s~ Exception occurred [attempt:
%d; elapsed: %s]",
+ destTableIdStr,
+ attempt.getAttemptNumber(),
+
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+ log.warn(msg, attempt.getExceptionCause());
+ }
+ }
+ }));
}
}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 5600b47f08..1cc15c1ec1 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
@@ -98,6 +99,7 @@ public class IcebergPartitionDataset extends IcebergDataset {
// Differences are getting data files, copying ancestor permission and
adding post publish steps
String fileSet = this.getFileSetId();
IcebergTable srcIcebergTable = getSrcIcebergTable();
+ Schema srcTableSchema = srcIcebergTable.accessTableMetadata().schema();
List<DataFile> srcDataFiles =
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
if (srcDataFiles.isEmpty()) {
@@ -128,7 +130,7 @@ public class IcebergPartitionDataset extends IcebergDataset
{
List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs,
srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec,
copyConfig);
// Adding this check to avoid adding post publish step when there are no
files to copy.
if (CollectionUtils.isNotEmpty(copyEntities)) {
- copyEntities.add(createOverwritePostPublishStep());
+ copyEntities.add(createOverwritePostPublishStep(srcTableSchema));
}
log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
@@ -191,9 +193,16 @@ public class IcebergPartitionDataset extends
IcebergDataset {
return new Path(fileDir, newFileName);
}
- private PostPublishStep createOverwritePostPublishStep() {
+ /**
+ * Creates a {@link PostPublishStep} for overwriting partitions in the
destination Iceberg table.
+ * @param srcTableSchema Schema of the source Iceberg table which needs to
be passed to the
+ * overwrite step for updating destination table
schema.
+ * @return a {@link PostPublishStep} that performs the partition overwrite
operation.
+ */
+ private PostPublishStep createOverwritePostPublishStep(Schema
srcTableSchema) {
IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new
IcebergOverwritePartitionsStep(
this.getDestIcebergTable().getTableId().toString(),
+ srcTableSchema,
this.partitionColumnName,
this.partitionColValue,
this.properties
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
index 3d3e5c1917..2bcc13d14b 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -53,6 +53,9 @@ public class IcebergPartitionDatasetFinder extends
IcebergDatasetFinder {
boolean validateStrictPartitionEquality =
Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
+ // This method only validates if the schema can be updated, no commit is
performed here
+
destIcebergTable.updateSchema(srcIcebergTable.accessTableMetadata().schema(),
true);
+
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
srcIcebergTable.accessTableMetadata(),
destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index fab570120b..0f6c371e0e 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -48,6 +49,7 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -315,4 +317,31 @@ public class IcebergTable {
log.info("~{}~ SnapshotId after overwrite: {}", tableId,
accessTableMetadata().currentSnapshot().snapshotId());
}
+ /**
+ * update table's schema to the provided {@link Schema}
+ * @param updatedSchema the updated schema to be set on the table.
+ * @param onlyValidate if true, only validates if the schema is can be
updated without committing.
+ * @throws TableNotFoundException if the table does not exist.
+ */
+ protected void updateSchema(Schema updatedSchema, boolean onlyValidate)
throws TableNotFoundException {
+ TableMetadata currentTableMetadata = accessTableMetadata();
+ Schema currentSchema = currentTableMetadata.schema();
+
+ if (currentSchema.sameSchema(updatedSchema)) {
+ log.info("~{}~ schema is already up-to-date", tableId);
+ return;
+ }
+
+ log.info("~{}~ updating schema from {} to {}, commit: {}", tableId,
currentSchema, updatedSchema, !onlyValidate);
+
+ TableMetadata updatedTableMetadata =
currentTableMetadata.updateSchema(updatedSchema,
updatedSchema.highestFieldId());
+
Preconditions.checkArgument(updatedTableMetadata.schema().sameSchema(updatedSchema),
"Schema mismatch after update, please check destination table");
+
+ if (!onlyValidate) {
+ tableOps.commit(currentTableMetadata, updatedTableMetadata);
+ tableOps.refresh();
+ log.info("~{}~ schema updated successfully", tableId);
+ }
+ }
+
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
index 7c47f2dfb8..90dc90c412 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import lombok.extern.slf4j.Slf4j;
@@ -36,11 +35,7 @@ public class IcebergTableMetadataValidatorUtils {
}
/**
- * Compares the metadata of the given two iceberg tables.
- * <ul>
- * <li>First compares the schema of the metadata.</li>
- * <li>Then compares the partition spec of the metadata.</li>
- * </ul>
+ * Compares the partition spec of the metadata.
* @param tableMetadataA the metadata of the first table
* @param tableMetadataB the metadata of the second table
* @param validateStrictPartitionEquality boolean value to control
strictness of partition spec comparison
@@ -52,27 +47,6 @@ public class IcebergTableMetadataValidatorUtils {
tableMetadataA.metadataFileLocation(),
tableMetadataB.metadataFileLocation());
- Schema schemaA = tableMetadataA.schema();
- Schema schemaB = tableMetadataB.schema();
- // TODO: Need to add support for schema evolution
- // This check needs to be broken down into multiple checks to support
schema evolution
- // Possible cases - schemaA == schemaB,
- // - schemaA is subset of schemaB [ schemaB Evolved ],
- // - schemaA is superset of schemaB [ schemaA Evolved ],
- // - Other cases?
- // Also consider using Strategy or any other design pattern for this to
make it a better solution
- if (!schemaA.sameSchema(schemaB)) {
- String errMsg = String.format(
- "Schema Mismatch between Metadata{%s} - SchemaId{%d} and
Metadata{%s} - SchemaId{%d}",
- tableMetadataA.metadataFileLocation(),
- schemaA.schemaId(),
- tableMetadataB.metadataFileLocation(),
- schemaB.schemaId()
- );
- log.error(errMsg);
- throw new IOException(errMsg);
- }
-
PartitionSpec partitionSpecA = tableMetadataA.spec();
PartitionSpec partitionSpecB = tableMetadataB.spec();
// .compatibleWith() doesn't match for specId of partition spec and
fieldId of partition fields while .equals() does
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index e4c1774e9a..634946ffd7 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -40,6 +41,7 @@ public class IcebergOverwritePartitionsStepTest {
private final String testPartitionColName = "testPartition";
private final String testPartitionColValue = "testValue";
private IcebergTable mockIcebergTable;
+ private Schema mockSchema;
private IcebergCatalog mockIcebergCatalog;
private Properties mockProperties;
private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
@@ -47,10 +49,11 @@ public class IcebergOverwritePartitionsStepTest {
@BeforeMethod
public void setUp() throws IOException {
mockIcebergTable = Mockito.mock(IcebergTable.class);
+ mockSchema = Mockito.mock(Schema.class);
mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
mockProperties = new Properties();
- spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
+ spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
testPartitionColName, testPartitionColValue, mockProperties));
spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
@@ -112,7 +115,7 @@ public class IcebergOverwritePartitionsStepTest {
int retryCount = 7;
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
+ "." + RETRY_TIMES,
Integer.toString(retryCount));
- spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
+ spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
testPartitionColName, testPartitionColValue, mockProperties));
spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
index 2cbde2df95..4e9718653f 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
@@ -35,23 +35,6 @@ public class IcebergTableMetadataValidatorUtilsTest {
.requiredString("field1")
.requiredString("field2")
.endRecord());
- private static final Schema schema2IsNotSchema1Compat =
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
- .fields()
- .requiredString("field2")
- .requiredString("field1")
- .endRecord());
- private static final Schema schema3 =
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
- .fields()
- .requiredString("field1")
- .requiredString("field2")
- .requiredInt("field3")
- .endRecord());
- private static final Schema schema4IsNotSchema3Compat =
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
- .fields()
- .requiredInt("field1")
- .requiredString("field2")
- .requiredInt("field3")
- .endRecord());
private static final PartitionSpec partitionSpec1 =
PartitionSpec.builderFor(schema1)
.identity("field1")
.build();
@@ -59,9 +42,6 @@ public class IcebergTableMetadataValidatorUtilsTest {
schema1, unpartitionedPartitionSpec,
"tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1
= TableMetadata.newTableMetadata(
schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1",
new HashMap<>());
- private static final TableMetadata
tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
- schema3, unpartitionedPartitionSpec,
"tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
- private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch
between Metadata";
private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition
Spec Mismatch between Metadata";
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE =
false;
@@ -74,64 +54,6 @@ public class IcebergTableMetadataValidatorUtilsTest {
Assert.assertTrue(true);
}
- @Test
- public void testValidateDifferentSchemaFails() {
- // Schema 1 and Schema 2 have different field order
-
- TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec =
TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
- unpartitionedPartitionSpec,
"tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
-
-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
- tableMetadataWithSchema2AndUnpartitionedSpec,
SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateSchemaWithDifferentTypesFails() {
- // schema 3 and schema 4 have different field types for field1
-
- TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec =
TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
- unpartitionedPartitionSpec,
"tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
-
-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
- tableMetadataWithSchema4AndUnpartitionedSpec,
SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateSchemaWithEvolvedSchemaIFails() {
- // Schema 3 has one more extra field as compared to Schema 1
-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
- tableMetadataWithSchema3AndUnpartitionedSpec,
SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateSchemaWithEvolvedSchemaIIFails() {
- // TODO: This test should pass in the future when we support schema
evolution
- // Schema 3 has one more extra field as compared to Schema 1
-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
- tableMetadataWithSchema1AndUnpartitionedSpec,
SCHEMA_MISMATCH_EXCEPTION);
- }
-
- @Test
- public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
- // Adding this test as to verify that partition copy doesn't proceed
further for this case
- // as while doing poc and testing had seen final commit gets fail if there
is mismatch in field type
- // specially from int to long
- Schema schema5EvolvedFromSchema4 =
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
- .fields()
- .requiredLong("field1")
- .requiredString("field2")
- .requiredInt("field3")
- .endRecord());
- PartitionSpec partitionSpec =
PartitionSpec.builderFor(schema5EvolvedFromSchema4)
- .identity("field1")
- .build();
- TableMetadata tableMetadataWithSchema5AndPartitionSpec =
TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
- partitionSpec, "tableLocationForSchema5WithPartitionSpec", new
HashMap<>());
-
-
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
- tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
- }
-
@Test
public void testValidateSamePartitionSpec() throws IOException {
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 90ba02b848..db3bc3257c 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -60,6 +61,8 @@ import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hive.HiveMetastoreTest;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.apache.iceberg.types.Types;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -105,6 +108,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
private final String tableName = "justtesting";
private final String destTableName = "destTable";
private TableIdentifier tableId;
+ private TableIdentifier sourceTableId;
+ private TableIdentifier destTableId;
private Table table;
private String catalogUri;
private String metadataBasePath;
@@ -126,6 +131,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
@AfterMethod
public void cleanUpEachTest() {
catalog.dropTable(tableId);
+ catalog.dropTable(sourceTableId);
+ catalog.dropTable(destTableId);
}
/** Test to verify getCurrentSnapshotInfo, getAllSnapshotInfosIterator,
getIncrementalSnapshotInfosIterator for iceberg table containing only data
files.*/
@@ -154,6 +161,164 @@ public class IcebergTableTest extends HiveMetastoreTest {
}
}
+ @Test
+ public void schemaUpdateSuccessTest() throws
IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1,
"id", Types.LongType.get()),
+ Types.NestedField.required(2, "product_name", Types.StringType.get()),
+ Types.NestedField.required(3, "details",
+ Types.StructType.of(Types.NestedField.required(4, "description",
Types.StringType.get()),
+ Types.NestedField.required(6, "category",
Types.StringType.get()),
+ Types.NestedField.required(7, "remarks",
Types.StringType.get()))),
+ Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+ PartitionSpec sourceIcebergPartitionSpec =
PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("id")
+ .build();
+
+ sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema,
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id",
Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec =
PartitionSpec.builderFor(destIcebergSchema)
+ .identity("id")
+ .build();
+
+ destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+ catalog.createTable(destTableId, destIcebergSchema,
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps =
Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId,
destTableOps, catalogUri, catalog.loadTable(tableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, false);
+ Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(),
Mockito.any());
+ }
+
+ @Test
+ public void schemaUpdateTest_divergentSchema() throws
IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1,
"id", Types.LongType.get()),
+ Types.NestedField.required(2, "product_name", Types.StringType.get()),
+ Types.NestedField.required(3, "details",
+ Types.StructType.of(Types.NestedField.required(4, "description",
Types.StringType.get()),
+ Types.NestedField.required(6, "category",
Types.StringType.get()),
+ Types.NestedField.required(7, "remarks",
Types.StringType.get()))),
+ Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+ PartitionSpec sourceIcebergPartitionSpec =
PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("id")
+ .build();
+
+ TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName +
"_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema,
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1,
"randomField", Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec =
PartitionSpec.builderFor(destIcebergSchema)
+ .identity("randomField")
+ .build();
+
+ TableIdentifier destTableId = TableIdentifier.of(dbName, tableName +
"_dest");
+ catalog.createTable(destTableId, destIcebergSchema,
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps =
Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId,
destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, false);
+ Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(),
Mockito.any());
+ }
+
+ @Test
+ public void schemaUpdateTest_sameSchema_noOpTest() throws
IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1,
"randomField", Types.LongType.get()));
+
+ PartitionSpec sourceIcebergPartitionSpec =
PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("randomField")
+ .build();
+
+ TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName +
"_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema,
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1,
"randomField", Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec =
PartitionSpec.builderFor(destIcebergSchema)
+ .identity("randomField")
+ .build();
+
+ TableIdentifier destTableId = TableIdentifier.of(dbName, tableName +
"_dest");
+ catalog.createTable(destTableId, destIcebergSchema,
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps =
Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId,
destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, false);
+ Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(),
Mockito.any());
+ }
+
+ @Test
+ public void schemaUpdate_onlyValidationTest() throws
IcebergTable.TableNotFoundException {
+ // create source iceberg table with this schema
+ Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1,
"id", Types.LongType.get()),
+ Types.NestedField.required(2, "product_name", Types.StringType.get()),
+ Types.NestedField.required(3, "details",
+ Types.StructType.of(Types.NestedField.required(4, "description",
Types.StringType.get()),
+ Types.NestedField.required(6, "category",
Types.StringType.get()),
+ Types.NestedField.required(7, "remarks",
Types.StringType.get()))),
+ Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+ PartitionSpec sourceIcebergPartitionSpec =
PartitionSpec.builderFor(sourceIcebergSchema)
+ .identity("id")
+ .build();
+
+ sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+ catalog.createTable(sourceTableId, sourceIcebergSchema,
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ // create destination iceberg table with this schema
+ Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id",
Types.LongType.get()));
+
+ PartitionSpec destIcebergPartitionSpec =
PartitionSpec.builderFor(destIcebergSchema)
+ .identity("id")
+ .build();
+
+ destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+ catalog.createTable(destTableId, destIcebergSchema,
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+ TableOperations destTableOps =
Mockito.spy(catalog.newTableOps(destTableId));
+ TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+ IcebergTable destIcebergTable = new IcebergTable(destTableId,
destTableOps, catalogUri, catalog.loadTable(tableId));
+
+ TableMetadata srcTableMetadata = sourceTableOps.current();
+ Schema srcTableSchema = srcTableMetadata.schema();
+
+ // update schema to verify is schema update succeeds
+ destIcebergTable.updateSchema(srcTableSchema, true);
+ Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(),
Mockito.any());
+ }
+
@DataProvider(name = "isPosDeleteProvider")
public Object[][] isPosDeleteProvider() {
return new Object[][] {{true}, {false}};