nsivabalan commented on code in PR #7787:
URL: https://github.com/apache/hudi/pull/7787#discussion_r1090028568


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -227,6 +227,14 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue("true")
       .withDocumentation("Validate the schema used for the write against the 
latest schema, for backwards compatibility.");
 
+  public static final ConfigProperty<String> SCHEMA_ALLOW_DROP_COLUMNS = 
ConfigProperty

Review Comment:
   let's define it as a string and not as config property. we don't want to 
expose this in our configurations page. 
   Or we need to come up w/ a way to tag internal configs so that we can fix 
our config docs generation.
   
   



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -230,19 +240,27 @@ public void testMORTable() throws Exception {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, false, numUpdateRecords, 4 * numRecords, 9);
+    try {
+      updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, false, numUpdateRecords, 4 * numRecords, 9);
+    } catch (HoodieUpsertException e) {

Review Comment:
   similar comment as above



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala:
##########
@@ -270,7 +277,10 @@ class TestBasicSchemaEvolution extends 
HoodieClientTestBase with ScalaAssertionS
         appendData(fourthSchema, fourthBatch)
       }
     } else {
-      appendData(fourthSchema, fourthBatch)
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(fourthSchema, fourthBatch)
+      }
+      appendData(fourthSchema, fourthBatch, shouldAllowDroppedColumns = true)

Review Comment:
   same here



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -266,19 +284,24 @@ public void testCopyOnWriteTable() throws Exception {
     checkReadRecords("000", numRecords);
 
     // Inserting records w/ new evolved schema (w/ tip column dropped)
-    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, 
shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", 
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
-    writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
true, numRecords, numRecords * 2, 1, false);
+    try {
+      writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
true, numRecords, numRecords * 2, 1, false);
+    } catch (HoodieInsertException e) {
+      assertFalse(shouldAllowDroppedColumns);

Review Comment:
   ditto



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -416,7 +419,8 @@ object HoodieSparkSqlWriter {
             case None =>
               // In case schema reconciliation is enabled we will employ 
(legacy) reconciliation
               // strategy to produce target writer's schema (see definition 
below)
-              val (reconciledSchema, isCompatible) = 
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
+              // NOTE: if schema reconciliation is turned on, then we should 
allow columns to be dropped
+              val (reconciledSchema, isCompatible) = 
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema, 
shouldAllowDroppedColumns = true)

Review Comment:
   why setting the value for shouldAllowDroppedColumns to true ? we do derive 
the value at L408 right ?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -194,20 +199,25 @@ public void testMORTable() throws Exception {
     checkReadRecords("000", numRecords);
 
     // Insert with evolved schema (column dropped) is allowed
-    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, 
shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("005", 
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
     // We cannot use insertBatch directly here because we want to insert 
records
     // with a evolved schema and insertBatch inserts records using the 
TRIP_EXAMPLE_SCHEMA.
-    writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
false, numRecords, 2 * numRecords, 5, false);
+    try {
+      writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
false, numRecords, 2 * numRecords, 5, false);
+    } catch (HoodieInsertException e) {

Review Comment:
   after L209, we should also do, before catch block
   ```
   assertTrue(shouldAllowDroppedColumns); 
   ```



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -36,10 +37,17 @@ public class AvroSchemaUtils {
   private AvroSchemaUtils() {}
 
   /**
-   * See {@link #isSchemaCompatible(Schema, Schema, boolean)} doc for more 
details
+   * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for 
more details
    */
   public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema) {
-    return isSchemaCompatible(prevSchema, newSchema, true);
+    return isSchemaCompatible(prevSchema, newSchema, true, false);

Review Comment:
   instead of false, can we do SCHEMA_ALLOW_DROP_COLUMNS.defaultValue()



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -50,10 +58,20 @@ public static boolean isSchemaCompatible(Schema prevSchema, 
Schema newSchema) {
    * @param newSchema new instance of the schema
    * @param checkNaming controls whether schemas fully-qualified names should 
be checked
    */
-  public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema, boolean checkNaming) {
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema, boolean checkNaming, boolean shouldAllowDroppedColumns) {
     // NOTE: We're establishing compatibility of the {@code prevSchema} and 
{@code newSchema}
     //       as following: {@code newSchema} is considered compatible to 
{@code prevSchema},
     //       iff data written using {@code prevSchema} could be read by {@code 
newSchema}
+
+    if (!shouldAllowDroppedColumns) {
+      // Check that each field in the oldSchema can be populated in the 
newSchema
+      for (final Schema.Field oldSchemaField : prevSchema.getFields()) {

Review Comment:
   we can simplify as 
   ```
   boolean isAnyColDropped = prevSchema.getFields().stream()
             .map(oldSchemaField -> 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
             .filter(Objects::isNull).findAny().isPresent();
   ```



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -88,7 +106,7 @@ public static boolean isCompatibleProjectionOf(Schema 
sourceSchema, Schema targe
   private static boolean isAtomicSchemasCompatible(Schema oneAtomicType, 
Schema anotherAtomicType) {
     // NOTE: Checking for compatibility of atomic types, we should ignore their
     //       corresponding fully-qualified names (as irrelevant)
-    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false);
+    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false, true);

Review Comment:
   why we are setting true here as default for shouldAllowDroppedColumns? 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala:
##########
@@ -217,7 +217,14 @@ class TestBasicSchemaEvolution extends 
HoodieClientTestBase with ScalaAssertionS
       Row("8", "Ron", "14", 1, 1),
       Row("9", "Germiona", "16", 1, 1))
 
-    appendData(thirdSchema, thirdBatch)
+    if (shouldReconcileSchema) {

Review Comment:
   lets fix comments in L204. 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -299,9 +322,13 @@ public void testCopyOnWriteTable() throws Exception {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, true,
-                numUpdateRecords, 3 * numRecords, 8);
+    try {
+      updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, true,
+          numUpdateRecords, 3 * numRecords, 8);
+    } catch (HoodieUpsertException e) {
+      assertFalse(shouldAllowDroppedColumns);

Review Comment:
   ditto



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala:
##########
@@ -217,7 +217,14 @@ class TestBasicSchemaEvolution extends 
HoodieClientTestBase with ScalaAssertionS
       Row("8", "Ron", "14", 1, 1),
       Row("9", "Germiona", "16", 1, 1))
 
-    appendData(thirdSchema, thirdBatch)
+    if (shouldReconcileSchema) {
+      appendData(thirdSchema, thirdBatch)
+    } else {
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(thirdSchema, thirdBatch)
+      }
+      appendData(thirdSchema, thirdBatch, shouldAllowDroppedColumns = true)

Review Comment:
   again, last arg should just be `shouldAllowDroppedColumns` and not 
`shouldAllowDroppedColumns = true`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to