This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e95ce792 Evolve iceberg table schema for partition copy (#4142)
66e95ce792 is described below

commit 66e95ce792f8dc43a70715e1135e2ece52dc44df
Author: thisisArjit <[email protected]>
AuthorDate: Fri Oct 24 18:08:35 2025 +0530

    Evolve iceberg table schema for partition copy (#4142)
---
 .../iceberg/IcebergOverwritePartitionsStep.java    |  56 +++++--
 .../copy/iceberg/IcebergPartitionDataset.java      |  13 +-
 .../iceberg/IcebergPartitionDatasetFinder.java     |   3 +
 .../data/management/copy/iceberg/IcebergTable.java |  29 ++++
 .../IcebergTableMetadataValidatorUtils.java        |  28 +---
 .../IcebergOverwritePartitionsStepTest.java        |   7 +-
 .../IcebergTableMetadataValidatorUtilsTest.java    |  78 ----------
 .../management/copy/iceberg/IcebergTableTest.java  | 165 +++++++++++++++++++++
 8 files changed, 258 insertions(+), 121 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index 30dc2094f3..e7c83ee2d2 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 
 import com.github.rholder.retry.Attempt;
@@ -57,6 +58,14 @@ import static 
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
 @Slf4j
 public class IcebergOverwritePartitionsStep implements CommitStep {
   private final String destTableIdStr;
+  /**
+   * Updated schema must be enforced to be backwards compatible by the catalog 
in
+   * the partition overwrite step otherwise previous partitions may become 
unreadable.
+   * This schema is determined at the start of the copy job before data files 
discovery and the same is
+   * committed before overwriting partitions. It is the responsibility of the 
catalog to ensure that
+   * no conflicting commits happen in between the copy job.
+   */
+  private final Schema updatedSchema;
   private final Properties properties;
   // data files are populated once all the copy tasks are done. Each 
IcebergPartitionCopyableFile has a serialized data file
   @Setter private List<DataFile> dataFiles;
@@ -64,6 +73,8 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
   private final String partitionValue;
   public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
       ".catalog.overwrite.partitions.retries";
+  public static final String SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
+      ".schema.update.retries";
   private static final Config RETRYER_FALLBACK_CONFIG = 
ConfigFactory.parseMap(ImmutableMap.of(
       RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
       RETRY_TIMES, 3,
@@ -75,8 +86,9 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
    * @param destTableIdStr the identifier of the destination table as a string
    * @param properties the properties containing configuration
    */
-  public IcebergOverwritePartitionsStep(String destTableIdStr, String 
partitionColName, String partitionValue, Properties properties) {
+  public IcebergOverwritePartitionsStep(String destTableIdStr, Schema 
updatedSchema, String partitionColName, String partitionValue, Properties 
properties) {
     this.destTableIdStr = destTableIdStr;
+    this.updatedSchema = updatedSchema;
     this.partitionColName = partitionColName;
     this.partitionValue = partitionValue;
     this.properties = properties;
@@ -108,6 +120,13 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
           dataFiles.size(),
           dataFiles.get(0).path()
       );
+      // update schema
+      Retryer<Void> schemaUpdateRetryer = createSchemaUpdateRetryer();
+      schemaUpdateRetryer.call(() -> {
+        destTable.updateSchema(this.updatedSchema, false);
+        return null;
+      });
+      // overwrite partitions
       Retryer<Void> overwritePartitionsRetryer = 
createOverwritePartitionsRetryer();
       overwritePartitionsRetryer.call(() -> {
         destTable.overwritePartition(dataFiles, this.partitionColName, 
this.partitionValue);
@@ -146,17 +165,30 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
         ? 
config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
         : ConfigFactory.empty();
 
+    return getRetryer(retryerOverridesConfig);
+  }
+
+  private Retryer<Void> createSchemaUpdateRetryer() {
+    Config config = ConfigFactory.parseProperties(this.properties);
+    Config retryerOverridesConfig = 
config.hasPath(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+        ? 
config.getConfig(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
+        : ConfigFactory.empty();
+
+    return getRetryer(retryerOverridesConfig);
+  }
+
+  private Retryer<Void> getRetryer(Config retryerOverridesConfig) {
     return 
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
 Optional.of(new RetryListener() {
-      @Override
-      public <V> void onRetry(Attempt<V> attempt) {
-        if (attempt.hasException()) {
-          String msg = String.format("~%s~ Exception while overwriting 
partitions [attempt: %d; elapsed: %s]",
-              destTableIdStr,
-              attempt.getAttemptNumber(),
-              
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
-          log.warn(msg, attempt.getExceptionCause());
-        }
-      }
-    }));
+          @Override
+          public <V> void onRetry(Attempt<V> attempt) {
+            if (attempt.hasException()) {
+              String msg = String.format("~%s~ Exception occurred [attempt: 
%d; elapsed: %s]",
+                  destTableIdStr,
+                  attempt.getAttemptNumber(),
+                  
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+              log.warn(msg, attempt.getExceptionCause());
+            }
+          }
+        }));
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 5600b47f08..1cc15c1ec1 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableProperties;
@@ -98,6 +99,7 @@ public class IcebergPartitionDataset extends IcebergDataset {
     //  Differences are getting data files, copying ancestor permission and 
adding post publish steps
     String fileSet = this.getFileSetId();
     IcebergTable srcIcebergTable = getSrcIcebergTable();
+    Schema srcTableSchema = srcIcebergTable.accessTableMetadata().schema();
     List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
 
     if (srcDataFiles.isEmpty()) {
@@ -128,7 +130,7 @@ public class IcebergPartitionDataset extends IcebergDataset 
{
     List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs, 
srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec, 
copyConfig);
     // Adding this check to avoid adding post publish step when there are no 
files to copy.
     if (CollectionUtils.isNotEmpty(copyEntities)) {
-      copyEntities.add(createOverwritePostPublishStep());
+      copyEntities.add(createOverwritePostPublishStep(srcTableSchema));
     }
 
     log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
@@ -191,9 +193,16 @@ public class IcebergPartitionDataset extends 
IcebergDataset {
     return new Path(fileDir, newFileName);
   }
 
-  private PostPublishStep createOverwritePostPublishStep() {
+  /**
+   * Creates a {@link PostPublishStep} for overwriting partitions in the 
destination Iceberg table.
+   * @param srcTableSchema Schema of the source Iceberg table which needs to 
be passed to the
+   *                       overwrite step for updating destination table 
schema.
+   * @return a {@link PostPublishStep} that performs the partition overwrite 
operation.
+   */
+  private PostPublishStep createOverwritePostPublishStep(Schema 
srcTableSchema) {
     IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new 
IcebergOverwritePartitionsStep(
         this.getDestIcebergTable().getTableId().toString(),
+        srcTableSchema,
         this.partitionColumnName,
         this.partitionColValue,
         this.properties
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
index 3d3e5c1917..2bcc13d14b 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -53,6 +53,9 @@ public class IcebergPartitionDatasetFinder extends 
IcebergDatasetFinder {
     boolean validateStrictPartitionEquality = 
Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
         DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
 
+    // This method only validates if the schema can be updated, no commit is 
performed here
+    
destIcebergTable.updateSchema(srcIcebergTable.accessTableMetadata().schema(), 
true);
+
     IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
         srcIcebergTable.accessTableMetadata(), 
destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index fab570120b..0f6c371e0e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -48,6 +49,7 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -315,4 +317,31 @@ public class IcebergTable {
     log.info("~{}~ SnapshotId after overwrite: {}", tableId, 
accessTableMetadata().currentSnapshot().snapshotId());
   }
 
+  /**
+   * update table's schema to the provided {@link Schema}
+   * @param updatedSchema the updated schema to be set on the table.
+   * @param onlyValidate if true, only validates if the schema is can be 
updated without committing.
+   * @throws TableNotFoundException if the table does not exist.
+   */
+  protected void updateSchema(Schema updatedSchema, boolean onlyValidate) 
throws TableNotFoundException {
+    TableMetadata currentTableMetadata = accessTableMetadata();
+    Schema currentSchema = currentTableMetadata.schema();
+
+    if (currentSchema.sameSchema(updatedSchema)) {
+      log.info("~{}~ schema is already up-to-date", tableId);
+      return;
+    }
+
+    log.info("~{}~ updating schema from {} to {}, commit: {}", tableId, 
currentSchema, updatedSchema, !onlyValidate);
+
+    TableMetadata updatedTableMetadata = 
currentTableMetadata.updateSchema(updatedSchema, 
updatedSchema.highestFieldId());
+    
Preconditions.checkArgument(updatedTableMetadata.schema().sameSchema(updatedSchema),
 "Schema mismatch after update, please check destination table");
+
+    if (!onlyValidate) {
+      tableOps.commit(currentTableMetadata, updatedTableMetadata);
+      tableOps.refresh();
+      log.info("~{}~ schema updated successfully", tableId);
+    }
+  }
+
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
index 7c47f2dfb8..90dc90c412 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.data.management.copy.iceberg;
 import java.io.IOException;
 
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableMetadata;
 
 import lombok.extern.slf4j.Slf4j;
@@ -36,11 +35,7 @@ public class IcebergTableMetadataValidatorUtils {
   }
 
   /**
-   * Compares the metadata of the given two iceberg tables.
-   * <ul>
-   *   <li>First compares the schema of the metadata.</li>
-   *   <li>Then compares the partition spec of the metadata.</li>
-   * </ul>
+   * Compares the partition spec of the metadata.
    * @param tableMetadataA  the metadata of the first table
    * @param tableMetadataB the metadata of the second table
    * @param validateStrictPartitionEquality boolean value to control 
strictness of partition spec comparison
@@ -52,27 +47,6 @@ public class IcebergTableMetadataValidatorUtils {
         tableMetadataA.metadataFileLocation(),
         tableMetadataB.metadataFileLocation());
 
-    Schema schemaA = tableMetadataA.schema();
-    Schema schemaB = tableMetadataB.schema();
-    // TODO: Need to add support for schema evolution
-    //  This check needs to be broken down into multiple checks to support 
schema evolution
-    //  Possible cases - schemaA == schemaB,
-    //  - schemaA is subset of schemaB [ schemaB Evolved ],
-    //  - schemaA is superset of schemaB [ schemaA Evolved ],
-    //  - Other cases?
-    //  Also consider using Strategy or any other design pattern for this to 
make it a better solution
-    if (!schemaA.sameSchema(schemaB)) {
-      String errMsg = String.format(
-          "Schema Mismatch between Metadata{%s} - SchemaId{%d} and 
Metadata{%s} - SchemaId{%d}",
-          tableMetadataA.metadataFileLocation(),
-          schemaA.schemaId(),
-          tableMetadataB.metadataFileLocation(),
-          schemaB.schemaId()
-      );
-      log.error(errMsg);
-      throw new IOException(errMsg);
-    }
-
     PartitionSpec partitionSpecA = tableMetadataA.spec();
     PartitionSpec partitionSpecB = tableMetadataB.spec();
     // .compatibleWith() doesn't match for specId of partition spec and 
fieldId of partition fields while .equals() does
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index e4c1774e9a..634946ffd7 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -40,6 +41,7 @@ public class IcebergOverwritePartitionsStepTest {
   private final String testPartitionColName = "testPartition";
   private final String testPartitionColValue = "testValue";
   private IcebergTable mockIcebergTable;
+  private Schema mockSchema;
   private IcebergCatalog mockIcebergCatalog;
   private Properties mockProperties;
   private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
@@ -47,10 +49,11 @@ public class IcebergOverwritePartitionsStepTest {
   @BeforeMethod
   public void setUp() throws IOException {
     mockIcebergTable = Mockito.mock(IcebergTable.class);
+    mockSchema = Mockito.mock(Schema.class);
     mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
     mockProperties = new Properties();
 
-    spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
+    spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
         testPartitionColName, testPartitionColValue, mockProperties));
 
     spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
@@ -112,7 +115,7 @@ public class IcebergOverwritePartitionsStepTest {
     int retryCount = 7;
     
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
 + "." + RETRY_TIMES,
         Integer.toString(retryCount));
-    spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
+    spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
         testPartitionColName, testPartitionColValue, mockProperties));
 
     spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
index 2cbde2df95..4e9718653f 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
@@ -35,23 +35,6 @@ public class IcebergTableMetadataValidatorUtilsTest {
       .requiredString("field1")
       .requiredString("field2")
       .endRecord());
-  private static final Schema schema2IsNotSchema1Compat = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
-      .fields()
-      .requiredString("field2")
-      .requiredString("field1")
-      .endRecord());
-  private static final Schema schema3 = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
-      .fields()
-      .requiredString("field1")
-      .requiredString("field2")
-      .requiredInt("field3")
-      .endRecord());
-  private static final Schema schema4IsNotSchema3Compat = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
-      .fields()
-      .requiredInt("field1")
-      .requiredString("field2")
-      .requiredInt("field3")
-      .endRecord());
   private static final PartitionSpec partitionSpec1 = 
PartitionSpec.builderFor(schema1)
       .identity("field1")
       .build();
@@ -59,9 +42,6 @@ public class IcebergTableMetadataValidatorUtilsTest {
       schema1, unpartitionedPartitionSpec, 
"tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
   private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 
= TableMetadata.newTableMetadata(
       schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", 
new HashMap<>());
-  private static final TableMetadata 
tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
-      schema3, unpartitionedPartitionSpec, 
"tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
-  private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch 
between Metadata";
   private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition 
Spec Mismatch between Metadata";
   private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
   private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = 
false;
@@ -74,64 +54,6 @@ public class IcebergTableMetadataValidatorUtilsTest {
     Assert.assertTrue(true);
   }
 
-  @Test
-  public void testValidateDifferentSchemaFails() {
-    // Schema 1 and Schema 2 have different field order
-
-    TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = 
TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
-        unpartitionedPartitionSpec, 
"tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
-
-    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
-        tableMetadataWithSchema2AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateSchemaWithDifferentTypesFails() {
-    // schema 3 and schema 4 have different field types for field1
-
-    TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = 
TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
-        unpartitionedPartitionSpec, 
"tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
-
-    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
-        tableMetadataWithSchema4AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateSchemaWithEvolvedSchemaIFails() {
-    // Schema 3 has one more extra field as compared to Schema 1
-    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
-        tableMetadataWithSchema3AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateSchemaWithEvolvedSchemaIIFails() {
-    // TODO: This test should pass in the future when we support schema 
evolution
-    // Schema 3 has one more extra field as compared to Schema 1
-    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
-        tableMetadataWithSchema1AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
-  }
-
-  @Test
-  public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
-    // Adding this test as to verify that partition copy doesn't proceed 
further for this case
-    // as while doing poc and testing had seen final commit gets fail if there 
is mismatch in field type
-    // specially from int to long
-    Schema schema5EvolvedFromSchema4 = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
-        .fields()
-        .requiredLong("field1")
-        .requiredString("field2")
-        .requiredInt("field3")
-        .endRecord());
-    PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema5EvolvedFromSchema4)
-        .identity("field1")
-        .build();
-    TableMetadata tableMetadataWithSchema5AndPartitionSpec = 
TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
-        partitionSpec, "tableLocationForSchema5WithPartitionSpec", new 
HashMap<>());
-
-    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
-        tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
-  }
-
   @Test
   public void testValidateSamePartitionSpec() throws IOException {
     IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 90ba02b848..db3bc3257c 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -60,6 +61,8 @@ import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.hive.HiveMetastoreTest;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.apache.iceberg.types.Types;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -105,6 +108,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
   private final String tableName = "justtesting";
   private final String destTableName = "destTable";
   private TableIdentifier tableId;
+  private TableIdentifier sourceTableId;
+  private TableIdentifier destTableId;
   private Table table;
   private String catalogUri;
   private String metadataBasePath;
@@ -126,6 +131,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
   @AfterMethod
   public void cleanUpEachTest() {
     catalog.dropTable(tableId);
+    catalog.dropTable(sourceTableId);
+    catalog.dropTable(destTableId);
   }
 
   /** Test to verify getCurrentSnapshotInfo, getAllSnapshotInfosIterator, 
getIncrementalSnapshotInfosIterator for iceberg table containing only data 
files.*/
@@ -154,6 +161,164 @@ public class IcebergTableTest extends HiveMetastoreTest {
     }
   }
 
+  @Test
+  public void schemaUpdateSuccessTest() throws 
IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, 
"id", Types.LongType.get()),
+        Types.NestedField.required(2, "product_name", Types.StringType.get()),
+        Types.NestedField.required(3, "details",
+            Types.StructType.of(Types.NestedField.required(4, "description", 
Types.StringType.get()),
+                Types.NestedField.required(6, "category", 
Types.StringType.get()),
+                Types.NestedField.required(7, "remarks", 
Types.StringType.get()))),
+        Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+    PartitionSpec sourceIcebergPartitionSpec = 
PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("id")
+        .build();
+
+    sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, 
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id", 
Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = 
PartitionSpec.builderFor(destIcebergSchema)
+        .identity("id")
+        .build();
+
+    destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+    catalog.createTable(destTableId, destIcebergSchema, 
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = 
Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, 
destTableOps, catalogUri, catalog.loadTable(tableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, false);
+    Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(), 
Mockito.any());
+  }
+
+  @Test
+  public void schemaUpdateTest_divergentSchema() throws 
IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, 
"id", Types.LongType.get()),
+        Types.NestedField.required(2, "product_name", Types.StringType.get()),
+        Types.NestedField.required(3, "details",
+            Types.StructType.of(Types.NestedField.required(4, "description", 
Types.StringType.get()),
+                Types.NestedField.required(6, "category", 
Types.StringType.get()),
+                Types.NestedField.required(7, "remarks", 
Types.StringType.get()))),
+        Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+    PartitionSpec sourceIcebergPartitionSpec = 
PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("id")
+        .build();
+
+    TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName + 
"_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, 
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, 
"randomField", Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = 
PartitionSpec.builderFor(destIcebergSchema)
+        .identity("randomField")
+        .build();
+
+    TableIdentifier destTableId = TableIdentifier.of(dbName, tableName + 
"_dest");
+    catalog.createTable(destTableId, destIcebergSchema, 
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = 
Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, 
destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, false);
+    Mockito.verify(destTableOps, Mockito.times(1)).commit(Mockito.any(), 
Mockito.any());
+  }
+
+  @Test
+  public void schemaUpdateTest_sameSchema_noOpTest() throws 
IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, 
"randomField", Types.LongType.get()));
+
+    PartitionSpec sourceIcebergPartitionSpec = 
PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("randomField")
+        .build();
+
+    TableIdentifier sourceTableId = TableIdentifier.of(dbName, tableName + 
"_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, 
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, 
"randomField", Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = 
PartitionSpec.builderFor(destIcebergSchema)
+        .identity("randomField")
+        .build();
+
+    TableIdentifier destTableId = TableIdentifier.of(dbName, tableName + 
"_dest");
+    catalog.createTable(destTableId, destIcebergSchema, 
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = 
Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, 
destTableOps, catalogUri, catalog.loadTable(destTableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, false);
+    Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(), 
Mockito.any());
+  }
+
+  @Test
+  public void schemaUpdate_onlyValidationTest() throws 
IcebergTable.TableNotFoundException {
+    // create source iceberg table with this schema
+    Schema sourceIcebergSchema = new Schema(Types.NestedField.required(1, 
"id", Types.LongType.get()),
+        Types.NestedField.required(2, "product_name", Types.StringType.get()),
+        Types.NestedField.required(3, "details",
+            Types.StructType.of(Types.NestedField.required(4, "description", 
Types.StringType.get()),
+                Types.NestedField.required(6, "category", 
Types.StringType.get()),
+                Types.NestedField.required(7, "remarks", 
Types.StringType.get()))),
+        Types.NestedField.required(5, "price", Types.DecimalType.of(10, 2)));
+
+    PartitionSpec sourceIcebergPartitionSpec = 
PartitionSpec.builderFor(sourceIcebergSchema)
+        .identity("id")
+        .build();
+
+    sourceTableId = TableIdentifier.of(dbName, tableName + "_source");
+    catalog.createTable(sourceTableId, sourceIcebergSchema, 
sourceIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    // create destination iceberg table with this schema
+    Schema destIcebergSchema = new Schema(Types.NestedField.required(1, "id", 
Types.LongType.get()));
+
+    PartitionSpec destIcebergPartitionSpec = 
PartitionSpec.builderFor(destIcebergSchema)
+        .identity("id")
+        .build();
+
+    destTableId = TableIdentifier.of(dbName, tableName + "_dest");
+    catalog.createTable(destTableId, destIcebergSchema, 
destIcebergPartitionSpec, Collections.singletonMap("format-version", "2"));
+
+    TableOperations destTableOps = 
Mockito.spy(catalog.newTableOps(destTableId));
+    TableOperations sourceTableOps = catalog.newTableOps(sourceTableId);
+
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, 
destTableOps, catalogUri, catalog.loadTable(tableId));
+
+    TableMetadata srcTableMetadata = sourceTableOps.current();
+    Schema srcTableSchema = srcTableMetadata.schema();
+
+    // update schema to verify is schema update succeeds
+    destIcebergTable.updateSchema(srcTableSchema, true);
+    Mockito.verify(destTableOps, Mockito.times(0)).commit(Mockito.any(), 
Mockito.any());
+  }
+
   @DataProvider(name = "isPosDeleteProvider")
   public Object[][] isPosDeleteProvider() {
     return new Object[][] {{true}, {false}};


Reply via email to