Davis-Zhang-Onehouse commented on code in PR #12781:
URL: https://github.com/apache/hudi/pull/12781#discussion_r1964569457


##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java:
##########
@@ -167,6 +167,14 @@ protected void initMetaClient(boolean preTableVersion8) 
throws IOException {
         preTableVersion8 ? Option.of(HoodieTableVersion.SIX) : 
Option.of(HoodieTableVersion.current()));
   }
 
+  protected void initMetaClient(boolean preTableVersion8, HoodieTableType 
tableType) throws IOException {

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -298,12 +296,17 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> 
executeClustering(HoodieC
     // Disable auto commit. Strategy is only expected to write data in new 
files.
     config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, 
Boolean.FALSE.toString());
 
-    final Schema schema = new 
TableSchemaResolver(table.getMetaClient()).getTableAvroSchemaForClustering(false).get();
+    Option<Schema> schema;
+    try {
+      schema = new 
TableSchemaGetter(table.getMetaClient()).getTableAvroSchemaIfPresent(false);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }

Review Comment:
   changed to table schema resolver



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -140,8 +140,14 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
       + "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},"
       + 
"{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
 
+  public static final String EXTRA_COL_SCHEMA1 = "{\"name\": 
\"extra_column1\", \"type\": [\"null\", \"string\"], \"default\": null },";
+  public static final String EXTRA_COL_SCHEMA2 = "{\"name\": 
\"extra_column2\", \"type\": [\"null\", \"string\"], \"default\": null},";

Review Comment:
   hudi does not support non null default value well, there are a few caveats. 
We prefer just default to null or no default value. Can walk you through why 
offline



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]> 
providerClassResolutionStrategyAndTableType()
     List<Object[]> opts = new ArrayList<>();
     for (Object providerClass : LOCK_PROVIDER_CLASSES) {
       for (ConflictResolutionStrategy resolutionStrategy : 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
-        opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, 
resolutionStrategy});
-        opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {COPY_ON_WRITE, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {MERGE_ON_READ, providerClass, 
resolutionStrategy});
       }
     }
     return opts;
   }
 
+  public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+    Object[][] data =
+        new Object[][] {
+            // First element set to true means before testing anything, we 
make a commit
+            // with schema TRIP_EXAMPLE_SCHEMA.
+            // {<should create initial commit>, <table type>, <txn 1 writer 
schema>, <txn 2 writer schema>,
+            // <should schema conflict>, <expected table schema after 
resolution>}
+            // --------------|-----read write-----|validate & 
commit|------------------------- txn 1
+            // --------------------------------|------read 
write------|--validate & commit--|- txn 2
+            // 
committed->|------------------------------------------------------------------- 
initial commit (optional)
+
+            // No schema evolution, no conflict.
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            // No concurrent schema evolution, no conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // If there is a initial commits defining table schema to 
TRIP_EXAMPLE_SCHEMA.
+            // as long as txn 2 stick to that schema, backwards compatibility 
handles everything.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // In case no initial commits, table schema is not really 
predefined.
+            // It means are effectively having 2 concurrent txn trying to 
define table schema
+            // differently in this case.
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into the same schema does not 
conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into different schemas conflicts.
+            // from clustering operation, instead of 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  /**
+   * Injects a new instant with customizable schema in commit metadata
+   * @param timestamp Instant timestamp
+   * @param schemaAttrValue Schema value to set in commit metadata (can be 
null)
+   */
+  public void injectInstantWithSchema(
+      String timestamp,
+      String action,
+      String schemaAttrValue) throws Exception {
+    HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+    instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Option.empty(),
+        WriteOperationType.UNKNOWN,
+        schemaAttrValue,
+        action)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentAlterSchemaTestDimension")
+  void testHoodieClientWithSchemaConflictResolution(
+      boolean createInitialCommit,
+      boolean createEmptyInitialCommit,
+      HoodieTableType tableType,
+      String writerSchema1,
+      String writerSchema2,
+      boolean shouldConflict,
+      String expectedTableSchemaAfterResolution) throws Exception {
+    if (tableType.equals(MERGE_ON_READ)) {
+      setUpMORTestTable();
+    }
+
+    Properties properties = new Properties();
+    properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(), 
String.valueOf(0));
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+    HoodieWriteConfig.Builder writeConfigBuilder = 
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withAutoCommit(false)
+        .withProperties(properties);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+    int totalCommits = 0;
+    final String nextCommitTime11 = "0011";
+    final String nextCommitTime12 = "0012";
+    if (createInitialCommit) {
+      // Create the first commit ingesting some data.
+      createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11, 
100, true);
+      createCommitWithUpserts(writeConfig, client1, nextCommitTime11, 
Option.empty(), nextCommitTime12, 100);
+      totalCommits += 2;
+    }
+
+    if (createEmptyInitialCommit) {
+      // Create an empty commit which does not contain a valid schema, schema 
conflict resolution should still be able
+      // to handle it properly. This can happen in delta streamer where no 
data is ingested but empty commit is created
+      // to save the checkpoint.
+      HoodieWriteConfig writeConfig22 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+      writeConfig22.setSchema("\"null\"");
+      final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+      JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+      // Perform upsert with empty RDD
+      client22.startCommitWithTime("0013");
+      JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+      client22.commit("0013", writeStatusRDD);
+      totalCommits += 1;
+
+      // Validate table schema in the end.
+      TableSchemaResolver r = new TableSchemaResolver(metaClient);
+      // Assert no table schema is defined.
+      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableAvroSchema(false));
+    }
+
+    // Start txn 002 altering table schema
+    HoodieWriteConfig writeConfig2 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig2.setSchema(writerSchema1);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+    final String nextCommitTime21 = "0021";
+    startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21, 
tableType);
+
+    // Start concurrent txn 003 alter table schema
+    HoodieWriteConfig writeConfig3 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig3.setSchema(writerSchema2);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+    final String nextCommitTime31 = "0031";
+    startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31, 
tableType);
+
+    Properties props = new TypedProperties();
+    props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+    HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+        .withProperties(props)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(true)

Review Comment:
   oh yeah, removed



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]> 
providerClassResolutionStrategyAndTableType()
     List<Object[]> opts = new ArrayList<>();
     for (Object providerClass : LOCK_PROVIDER_CLASSES) {
       for (ConflictResolutionStrategy resolutionStrategy : 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
-        opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, 
resolutionStrategy});
-        opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {COPY_ON_WRITE, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {MERGE_ON_READ, providerClass, 
resolutionStrategy});
       }
     }
     return opts;
   }
 
+  public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+    Object[][] data =
+        new Object[][] {
+            // First element set to true means before testing anything, we 
make a commit
+            // with schema TRIP_EXAMPLE_SCHEMA.
+            // {<should create initial commit>, <table type>, <txn 1 writer 
schema>, <txn 2 writer schema>,
+            // <should schema conflict>, <expected table schema after 
resolution>}
+            // --------------|-----read write-----|validate & 
commit|------------------------- txn 1
+            // --------------------------------|------read 
write------|--validate & commit--|- txn 2
+            // 
committed->|------------------------------------------------------------------- 
initial commit (optional)
+
+            // No schema evolution, no conflict.
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            // No concurrent schema evolution, no conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // If there is a initial commits defining table schema to 
TRIP_EXAMPLE_SCHEMA.
+            // as long as txn 2 stick to that schema, backwards compatibility 
handles everything.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // In case no initial commits, table schema is not really 
predefined.
+            // It means are effectively having 2 concurrent txn trying to 
define table schema
+            // differently in this case.
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into the same schema does not 
conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into different schemas conflicts.
+            // from clustering operation, instead of 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  /**
+   * Injects a new instant with customizable schema in commit metadata
+   * @param timestamp Instant timestamp
+   * @param schemaAttrValue Schema value to set in commit metadata (can be 
null)
+   */
+  public void injectInstantWithSchema(
+      String timestamp,
+      String action,
+      String schemaAttrValue) throws Exception {
+    HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+    instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Option.empty(),
+        WriteOperationType.UNKNOWN,
+        schemaAttrValue,
+        action)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentAlterSchemaTestDimension")
+  void testHoodieClientWithSchemaConflictResolution(
+      boolean createInitialCommit,
+      boolean createEmptyInitialCommit,
+      HoodieTableType tableType,
+      String writerSchema1,
+      String writerSchema2,
+      boolean shouldConflict,
+      String expectedTableSchemaAfterResolution) throws Exception {
+    if (tableType.equals(MERGE_ON_READ)) {
+      setUpMORTestTable();
+    }
+
+    Properties properties = new Properties();
+    properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(), 
String.valueOf(0));
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+    HoodieWriteConfig.Builder writeConfigBuilder = 
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withAutoCommit(false)
+        .withProperties(properties);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+    int totalCommits = 0;
+    final String nextCommitTime11 = "0011";
+    final String nextCommitTime12 = "0012";
+    if (createInitialCommit) {
+      // Create the first commit ingesting some data.
+      createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11, 
100, true);
+      createCommitWithUpserts(writeConfig, client1, nextCommitTime11, 
Option.empty(), nextCommitTime12, 100);
+      totalCommits += 2;
+    }
+
+    if (createEmptyInitialCommit) {
+      // Create an empty commit which does not contain a valid schema, schema 
conflict resolution should still be able
+      // to handle it properly. This can happen in delta streamer where no 
data is ingested but empty commit is created
+      // to save the checkpoint.
+      HoodieWriteConfig writeConfig22 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+      writeConfig22.setSchema("\"null\"");
+      final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+      JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+      // Perform upsert with empty RDD
+      client22.startCommitWithTime("0013");
+      JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+      client22.commit("0013", writeStatusRDD);
+      totalCommits += 1;
+
+      // Validate table schema in the end.
+      TableSchemaResolver r = new TableSchemaResolver(metaClient);
+      // Assert no table schema is defined.
+      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableAvroSchema(false));
+    }
+
+    // Start txn 002 altering table schema
+    HoodieWriteConfig writeConfig2 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig2.setSchema(writerSchema1);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+    final String nextCommitTime21 = "0021";
+    startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21, 
tableType);
+
+    // Start concurrent txn 003 alter table schema
+    HoodieWriteConfig writeConfig3 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig3.setSchema(writerSchema2);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+    final String nextCommitTime31 = "0031";
+    startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31, 
tableType);
+
+    Properties props = new TypedProperties();
+    props.setProperty("hoodie.datasource.write.row.writer.enable", "false");

Review Comment:
   removed and no issue with the test



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -300,6 +300,13 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Schema string representing the latest schema of the 
table. Hudi passes this to "
           + "implementations of evolution of schema");
 
+  public static final ConfigProperty<Boolean> 
ENABLE_SCHEMA_CONFLICT_RESOLUTION = ConfigProperty
+      .key(CONCURRENCY_PREFIX + "schema.conflict.resolution.enable")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.0.2")

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -185,43 +176,32 @@ public Option<Schema> getTableAvroSchemaIfPresent(boolean 
includeMetadataFields)
     return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
   }
 
-  Option<Schema> getTableAvroSchemaInternal(boolean includeMetadataFields, 
Option<HoodieInstant> instantOpt) {
-    return (instantOpt.isPresent()
-        ? getTableSchemaFromCommitMetadata(instantOpt.get(), 
includeMetadataFields)
-        : getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
-        .or(() -> getTableCreateSchemaWithMetadata(includeMetadataFields))
-        .or(() -> getSchemaFromDataFileIfPresent(includeMetadataFields))
-        .map(this::handlePartitionColumnsIfNeeded);
-  }
-
-  /**
-   * Retrieves the table creation schema with metadata fields and partition 
columns handled.
-   *
-   * @param includeMetadataFields whether to include metadata fields in the 
schema
-   * @return Option containing the fully processed schema if available, empty 
Option otherwise
-   */
-  public Option<Schema> getTableCreateSchemaWithMetadata(boolean 
includeMetadataFields) {
-    return metaClient.getTableConfig().getTableCreateSchema()
-        .map(tableSchema ->
-            includeMetadataFields
-                ? HoodieAvroUtils.addMetadataFields(tableSchema, 
hasOperationField.get())
-                : tableSchema)
-        .map(this::handlePartitionColumnsIfNeeded);
-  }
-
-  /**
-   * Handles partition column logic for a given schema.
-   *
-   * @param schema the input schema to process
-   * @return the processed schema with partition columns handled appropriately
-   */
-  private Schema handlePartitionColumnsIfNeeded(Schema schema) {
-    if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
+  private Option<Schema> getTableAvroSchemaInternal(boolean 
includeMetadataFields, Option<HoodieInstant> instantOpt) {

Review Comment:
   yes, as we discussed, no changes to the old table schema resolver



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java:
##########
@@ -53,6 +54,12 @@ public HoodieReplaceCommitMetadata(boolean compacted) {
     partitionToReplaceFileIds = new HashMap<>();
   }
 
+  @VisibleForTesting
+  public HoodieReplaceCommitMetadata(HoodieCommitMetadata metadata) {
+    super(metadata);
+    partitionToReplaceFileIds = new HashMap<>();
+  }

Review Comment:
   this is reverted in the latest commit, when I checkout the branch I could 
not find this change.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##########
@@ -3829,7 +3829,7 @@ protected HoodieTableType getTableType() {
    * TODO: Fix this and increase test coverage to include clustering via row 
writers
    * @return
    */
-  private static Properties getDisabledRowWriterProperties() {
+  public static Properties getDisabledRowWriterProperties() {

Review Comment:
   reverted



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -157,6 +158,19 @@ protected HoodieTable(HoodieWriteConfig config, 
HoodieEngineContext context, Hoo
     this.taskContextSupplier = context.getTaskContextSupplier();
   }
 
+  @VisibleForTesting
+  protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, 
HoodieTableMetaClient metaClient, FileSystemViewManager viewManager, 
TaskContextSupplier supplier) {

Review Comment:
   we can, and we need to add a lot of extra mocks for viewManager and 
supplier. It does not help with test but make it unnecessarily complex on code 
path we don't care at all.



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java:
##########
@@ -372,7 +372,8 @@ public void testGetCommitsTimelineAfter() throws 
IOException {
   private HoodieTableMetaClient prepareMetaClient(
       List<HoodieInstant> activeInstants,
       List<HoodieInstant> archivedInstants,
-      String startTs) throws IOException {
+      String startTs
+  ) throws IOException {

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -1081,11 +1327,23 @@ private JavaRDD<WriteStatus> 
createCommitWithInserts(HoodieWriteConfig cfg, Spar
     return result;
   }
 
+  private static void startSchemaEvolutionTransaction(HoodieTableMetaClient 
metaClient, SparkRDDWriteClient client, String nextCommitTime2, HoodieTableType 
tableType) throws IOException {
+    String commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.UPSERT, tableType);
+    client.startCommitWithTime(nextCommitTime2, commitActionType);
+    client.preWrite(nextCommitTime2, WriteOperationType.UPSERT, 
client.createMetaClient(true));
+    HoodieInstant requested = 
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType, 
nextCommitTime2);
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.setOperationType(WriteOperationType.UPSERT);
+    
client.createMetaClient(true).getActiveTimeline().transitionRequestedToInflight(
+        requested, 
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));

Review Comment:
   revised to the updated API



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]> 
providerClassResolutionStrategyAndTableType()
     List<Object[]> opts = new ArrayList<>();
     for (Object providerClass : LOCK_PROVIDER_CLASSES) {
       for (ConflictResolutionStrategy resolutionStrategy : 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
-        opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, 
resolutionStrategy});
-        opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {COPY_ON_WRITE, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {MERGE_ON_READ, providerClass, 
resolutionStrategy});
       }
     }
     return opts;
   }
 
+  public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+    Object[][] data =
+        new Object[][] {
+            // First element set to true means before testing anything, we 
make a commit
+            // with schema TRIP_EXAMPLE_SCHEMA.
+            // {<should create initial commit>, <table type>, <txn 1 writer 
schema>, <txn 2 writer schema>,
+            // <should schema conflict>, <expected table schema after 
resolution>}
+            // --------------|-----read write-----|validate & 
commit|------------------------- txn 1
+            // --------------------------------|------read 
write------|--validate & commit--|- txn 2
+            // 
committed->|------------------------------------------------------------------- 
initial commit (optional)
+
+            // No schema evolution, no conflict.
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            // No concurrent schema evolution, no conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // If there is a initial commits defining table schema to 
TRIP_EXAMPLE_SCHEMA.
+            // as long as txn 2 stick to that schema, backwards compatibility 
handles everything.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // In case no initial commits, table schema is not really 
predefined.
+            // It means are effectively having 2 concurrent txn trying to 
define table schema
+            // differently in this case.
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into the same schema does not 
conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into different schemas conflicts.
+            // from clustering operation, instead of 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  /**
+   * Injects a new instant with customizable schema in commit metadata
+   * @param timestamp Instant timestamp
+   * @param schemaAttrValue Schema value to set in commit metadata (can be 
null)
+   */
+  public void injectInstantWithSchema(
+      String timestamp,
+      String action,
+      String schemaAttrValue) throws Exception {
+    HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+    instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Option.empty(),
+        WriteOperationType.UNKNOWN,
+        schemaAttrValue,
+        action)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentAlterSchemaTestDimension")
+  void testHoodieClientWithSchemaConflictResolution(
+      boolean createInitialCommit,
+      boolean createEmptyInitialCommit,
+      HoodieTableType tableType,
+      String writerSchema1,
+      String writerSchema2,
+      boolean shouldConflict,
+      String expectedTableSchemaAfterResolution) throws Exception {
+    if (tableType.equals(MERGE_ON_READ)) {
+      setUpMORTestTable();
+    }
+
+    Properties properties = new Properties();
+    properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(), 
String.valueOf(0));
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+    HoodieWriteConfig.Builder writeConfigBuilder = 
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withAutoCommit(false)
+        .withProperties(properties);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+    int totalCommits = 0;
+    final String nextCommitTime11 = "0011";
+    final String nextCommitTime12 = "0012";
+    if (createInitialCommit) {
+      // Create the first commit ingesting some data.
+      createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11, 
100, true);
+      createCommitWithUpserts(writeConfig, client1, nextCommitTime11, 
Option.empty(), nextCommitTime12, 100);
+      totalCommits += 2;
+    }
+
+    if (createEmptyInitialCommit) {
+      // Create an empty commit which does not contain a valid schema, schema 
conflict resolution should still be able
+      // to handle it properly. This can happen in delta streamer where no 
data is ingested but empty commit is created
+      // to save the checkpoint.
+      HoodieWriteConfig writeConfig22 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+      writeConfig22.setSchema("\"null\"");
+      final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+      JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+      // Perform upsert with empty RDD
+      client22.startCommitWithTime("0013");
+      JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+      client22.commit("0013", writeStatusRDD);
+      totalCommits += 1;
+
+      // Validate table schema in the end.
+      TableSchemaResolver r = new TableSchemaResolver(metaClient);
+      // Assert no table schema is defined.
+      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableAvroSchema(false));
+    }
+
+    // Start txn 002 altering table schema
+    HoodieWriteConfig writeConfig2 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig2.setSchema(writerSchema1);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+    final String nextCommitTime21 = "0021";
+    startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21, 
tableType);
+
+    // Start concurrent txn 003 alter table schema
+    HoodieWriteConfig writeConfig3 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig3.setSchema(writerSchema2);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+    final String nextCommitTime31 = "0031";
+    startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31, 
tableType);
+
+    Properties props = new TypedProperties();
+    props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+    HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+        .withProperties(props)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(true)

Review Comment:
   revised it to set clustering/compaction based on table type. for mor we want 
to run compaction, also rename the variable to avoid confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to