This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b4e4d4a59c [GOBBLIN-2163] Add Iceberg-Distcp table metadata validation 
(for partition copy) (#4064)
b4e4d4a59c is described below

commit b4e4d4a59c6311bc5b34d6092f09b170319c007c
Author: Vivek Rai <[email protected]>
AuthorDate: Tue Oct 29 09:38:28 2024 +0530

    [GOBBLIN-2163] Add Iceberg-Distcp table metadata validation (for partition 
copy) (#4064)
---
 .../iceberg/IcebergPartitionDatasetFinder.java     |  10 +-
 .../IcebergTableMetadataValidatorUtils.java        |  97 ++++++++++
 .../IcebergTableMetadataValidatorUtilsTest.java    | 202 +++++++++++++++++++++
 3 files changed, 308 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
index 581a265e38..3d3e5c1917 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -38,6 +38,9 @@ import lombok.extern.slf4j.Slf4j;
 public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
   public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
   public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value";
+  public static final String 
ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY = ICEBERG_DATASET_PREFIX + 
"partition.validate.strict.equality";
+  //  true, requires equality of the partitions' specId as well as the 
partitions' fields' fieldId
+  public static final String 
DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY= "true";
 
   public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties 
properties) {
     super(sourceFs, properties);
@@ -46,7 +49,12 @@ public class IcebergPartitionDatasetFinder extends 
IcebergDatasetFinder {
   @Override
   protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, 
IcebergTable destIcebergTable,
       Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) 
throws IOException {
-//    TODO: Add Validator for source and destination tables later
+
+    boolean validateStrictPartitionEquality = 
Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
+        DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));
+
+    IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+        srcIcebergTable.accessTableMetadata(), 
destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);
 
     String partitionColumnName = getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
         ICEBERG_PARTITION_NAME_KEY);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
new file mode 100644
index 0000000000..7c47f2dfb8
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the given tables 
metadata have same schema and partition spec.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+  private IcebergTableMetadataValidatorUtils() {
+    // Do not instantiate
+  }
+
+  /**
+   * Compares the metadata of the given two iceberg tables.
+   * <ul>
+   *   <li>First compares the schema of the metadata.</li>
+   *   <li>Then compares the partition spec of the metadata.</li>
+   * </ul>
+   * @param tableMetadataA  the metadata of the first table
+   * @param tableMetadataB the metadata of the second table
+   * @param validateStrictPartitionEquality boolean value to control 
strictness of partition spec comparison
+   * @throws IOException if the schemas or partition spec do not match
+   */
+  public static void failUnlessCompatibleStructure(TableMetadata 
tableMetadataA,
+      TableMetadata tableMetadataB, boolean validateStrictPartitionEquality) 
throws IOException {
+    log.info("Starting comparison between iceberg tables with metadata file 
location : {} and {}",
+        tableMetadataA.metadataFileLocation(),
+        tableMetadataB.metadataFileLocation());
+
+    Schema schemaA = tableMetadataA.schema();
+    Schema schemaB = tableMetadataB.schema();
+    // TODO: Need to add support for schema evolution
+    //  This check needs to be broken down into multiple checks to support 
schema evolution
+    //  Possible cases - schemaA == schemaB,
+    //  - schemaA is subset of schemaB [ schemaB Evolved ],
+    //  - schemaA is superset of schemaB [ schemaA Evolved ],
+    //  - Other cases?
+    //  Also consider using Strategy or any other design pattern for this to 
make it a better solution
+    if (!schemaA.sameSchema(schemaB)) {
+      String errMsg = String.format(
+          "Schema Mismatch between Metadata{%s} - SchemaId{%d} and 
Metadata{%s} - SchemaId{%d}",
+          tableMetadataA.metadataFileLocation(),
+          schemaA.schemaId(),
+          tableMetadataB.metadataFileLocation(),
+          schemaB.schemaId()
+      );
+      log.error(errMsg);
+      throw new IOException(errMsg);
+    }
+
+    PartitionSpec partitionSpecA = tableMetadataA.spec();
+    PartitionSpec partitionSpecB = tableMetadataB.spec();
+    // .compatibleWith() doesn't match for specId of partition spec and 
fieldId of partition fields while .equals() does
+    boolean partitionSpecMatch = validateStrictPartitionEquality ? 
partitionSpecA.equals(partitionSpecB)
+        : partitionSpecA.compatibleWith(partitionSpecB);
+    if (!partitionSpecMatch) {
+      String errMsg = String.format(
+          "Partition Spec Mismatch between Metadata{%s} - PartitionSpecId{%d} 
and Metadata{%s} - PartitionSpecId{%d}",
+          tableMetadataA.metadataFileLocation(),
+          partitionSpecA.specId(),
+          tableMetadataB.metadataFileLocation(),
+          partitionSpecB.specId()
+      );
+      log.error(errMsg);
+      throw new IOException(errMsg);
+    }
+
+    log.info("Comparison completed successfully between iceberg tables with 
metadata file location : {} and {}",
+        tableMetadataA.metadataFileLocation(),
+        tableMetadataB.metadataFileLocation());
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
new file mode 100644
index 0000000000..2cbde2df95
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IcebergTableMetadataValidatorUtilsTest {
+  private static final PartitionSpec unpartitionedPartitionSpec = 
PartitionSpec.unpartitioned();
+  private static final Schema schema1 = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1")
+      .fields()
+      .requiredString("field1")
+      .requiredString("field2")
+      .endRecord());
+  private static final Schema schema2IsNotSchema1Compat = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema2")
+      .fields()
+      .requiredString("field2")
+      .requiredString("field1")
+      .endRecord());
+  private static final Schema schema3 = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema3")
+      .fields()
+      .requiredString("field1")
+      .requiredString("field2")
+      .requiredInt("field3")
+      .endRecord());
+  private static final Schema schema4IsNotSchema3Compat = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema4")
+      .fields()
+      .requiredInt("field1")
+      .requiredString("field2")
+      .requiredInt("field3")
+      .endRecord());
+  private static final PartitionSpec partitionSpec1 = 
PartitionSpec.builderFor(schema1)
+      .identity("field1")
+      .build();
+  private static final TableMetadata 
tableMetadataWithSchema1AndUnpartitionedSpec = TableMetadata.newTableMetadata(
+      schema1, unpartitionedPartitionSpec, 
"tableLocationForSchema1WithUnpartitionedSpec", new HashMap<>());
+  private static final TableMetadata tableMetadataWithSchema1AndPartitionSpec1 
= TableMetadata.newTableMetadata(
+      schema1, partitionSpec1, "tableLocationForSchema1WithPartitionSpec1", 
new HashMap<>());
+  private static final TableMetadata 
tableMetadataWithSchema3AndUnpartitionedSpec = TableMetadata.newTableMetadata(
+      schema3, unpartitionedPartitionSpec, 
"tableLocationForSchema3WithUnpartitionedSpec", new HashMap<>());
+  private static final String SCHEMA_MISMATCH_EXCEPTION = "Schema Mismatch 
between Metadata";
+  private static final String PARTITION_SPEC_MISMATCH_EXCEPTION = "Partition 
Spec Mismatch between Metadata";
+  private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_TRUE = true;
+  private static final boolean VALIDATE_STRICT_PARTITION_EQUALITY_FALSE = 
false;
+  @Test
+  public void testValidateSameSchema() throws IOException {
+    IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+        tableMetadataWithSchema1AndUnpartitionedSpec, 
tableMetadataWithSchema1AndUnpartitionedSpec,
+        VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
+    );
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testValidateDifferentSchemaFails() {
+    // Schema 1 and Schema 2 have different field order
+
+    TableMetadata tableMetadataWithSchema2AndUnpartitionedSpec = 
TableMetadata.newTableMetadata(schema2IsNotSchema1Compat,
+        unpartitionedPartitionSpec, 
"tableLocationForSchema2WithUnpartitionedSpec", new HashMap<>());
+
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+        tableMetadataWithSchema2AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testValidateSchemaWithDifferentTypesFails() {
+    // schema 3 and schema 4 have different field types for field1
+
+    TableMetadata tableMetadataWithSchema4AndUnpartitionedSpec = 
TableMetadata.newTableMetadata(schema4IsNotSchema3Compat,
+        unpartitionedPartitionSpec, 
"tableLocationForSchema4WithUnpartitionedSpec", new HashMap<>());
+
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
+        tableMetadataWithSchema4AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testValidateSchemaWithEvolvedSchemaIFails() {
+    // Schema 3 has one more extra field as compared to Schema 1
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+        tableMetadataWithSchema3AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testValidateSchemaWithEvolvedSchemaIIFails() {
+    // TODO: This test should pass in the future when we support schema 
evolution
+    // Schema 3 has one more extra field as compared to Schema 1
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema3AndUnpartitionedSpec,
+        tableMetadataWithSchema1AndUnpartitionedSpec, 
SCHEMA_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testValidateOneSchemaEvolvedFromIntToLongTypeFails() {
+    // Adding this test as to verify that partition copy doesn't proceed 
further for this case
+    // as while doing poc and testing had seen final commit gets fail if there 
is mismatch in field type
+    // specially from int to long
+    Schema schema5EvolvedFromSchema4 = 
AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema5")
+        .fields()
+        .requiredLong("field1")
+        .requiredString("field2")
+        .requiredInt("field3")
+        .endRecord());
+    PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema5EvolvedFromSchema4)
+        .identity("field1")
+        .build();
+    TableMetadata tableMetadataWithSchema5AndPartitionSpec = 
TableMetadata.newTableMetadata(schema5EvolvedFromSchema4,
+        partitionSpec, "tableLocationForSchema5WithPartitionSpec", new 
HashMap<>());
+
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+        tableMetadataWithSchema5AndPartitionSpec, SCHEMA_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testValidateSamePartitionSpec() throws IOException {
+    IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+        tableMetadataWithSchema1AndPartitionSpec1, 
tableMetadataWithSchema1AndPartitionSpec1,
+        VALIDATE_STRICT_PARTITION_EQUALITY_TRUE
+    );
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testValidatePartitionSpecWithDiffNameFails() {
+    PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
+        .identity("field2")
+        .build();
+    TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = 
TableMetadata.newTableMetadata(schema1, partitionSpec12,
+        "tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
+        tableMetadataWithSchema1AndPartitionSpec12, 
PARTITION_SPEC_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testValidatePartitionSpecWithUnpartitionedFails() {
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndUnpartitionedSpec,
+        tableMetadataWithSchema1AndPartitionSpec1, 
PARTITION_SPEC_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testPartitionSpecWithDifferentTransformFails() {
+    PartitionSpec partitionSpec12 = PartitionSpec.builderFor(schema1)
+        .truncate("field1", 4)
+        .build();
+    TableMetadata tableMetadataWithSchema1AndPartitionSpec12 = 
TableMetadata.newTableMetadata(schema1, partitionSpec12,
+        "tableLocationForSchema1WithPartitionSpec12", new HashMap<>());
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpec1,
+        tableMetadataWithSchema1AndPartitionSpec12, 
PARTITION_SPEC_MISMATCH_EXCEPTION);
+  }
+
+  @Test
+  public void testStrictPartitionSpecEqualityOffVsOn() throws IOException {
+    PartitionSpec partitionSpecWithTwoCols = PartitionSpec.builderFor(schema1)
+        .identity("field1")
+        .identity("field2")
+        .build();
+
+    TableMetadata tableMetadataWithSchema1AndPartitionSpecWithTwoCols = 
TableMetadata.newTableMetadata(schema1,
+        partitionSpecWithTwoCols, 
"tableLocationForSchema1WithPartitionSpecWithTwoCols", new HashMap<>());
+    TableMetadata updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1 
= tableMetadataWithSchema1AndPartitionSpec1
+        
.updatePartitionSpec(tableMetadataWithSchema1AndPartitionSpecWithTwoCols.spec());
+
+    IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
+        tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
+        updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1,
+        VALIDATE_STRICT_PARTITION_EQUALITY_FALSE);
+    Assert.assertTrue(true); // passes w/ non-strict equality...
+    // but fails when strict equality
+    
verifyStrictFailUnlessCompatibleStructureThrows(tableMetadataWithSchema1AndPartitionSpecWithTwoCols,
+        updatedMetadataForTableMetadataWithSchema1AndPartitionSpec1, 
PARTITION_SPEC_MISMATCH_EXCEPTION);
+  }
+
+  private void verifyStrictFailUnlessCompatibleStructureThrows(TableMetadata 
tableAMetadata,
+      TableMetadata tableBMetadata, String expectedMessage) {
+    IOException exception = Assert.expectThrows(IOException.class, () -> {
+      
IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(tableAMetadata,
 tableBMetadata,
+          VALIDATE_STRICT_PARTITION_EQUALITY_TRUE);
+    });
+    Assert.assertTrue(exception.getMessage().startsWith(expectedMessage));
+  }
+}

Reply via email to