yihua commented on code in PR #12781:
URL: https://github.com/apache/hudi/pull/12781#discussion_r1964413907


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]> 
providerClassResolutionStrategyAndTableType()
     List<Object[]> opts = new ArrayList<>();
     for (Object providerClass : LOCK_PROVIDER_CLASSES) {
       for (ConflictResolutionStrategy resolutionStrategy : 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
-        opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, 
resolutionStrategy});
-        opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {COPY_ON_WRITE, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {MERGE_ON_READ, providerClass, 
resolutionStrategy});
       }
     }
     return opts;
   }
 
+  public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+    Object[][] data =
+        new Object[][] {
+            // First element set to true means before testing anything, we 
make a commit
+            // with schema TRIP_EXAMPLE_SCHEMA.
+            // {<should create initial commit>, <table type>, <txn 1 writer 
schema>, <txn 2 writer schema>,
+            // <should schema conflict>, <expected table schema after 
resolution>}
+            // --------------|-----read write-----|validate & 
commit|------------------------- txn 1
+            // --------------------------------|------read 
write------|--validate & commit--|- txn 2
+            // 
committed->|------------------------------------------------------------------- 
initial commit (optional)
+
+            // No schema evolution, no conflict.
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            // No concurrent schema evolution, no conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // If there is a initial commits defining table schema to 
TRIP_EXAMPLE_SCHEMA.
+            // as long as txn 2 stick to that schema, backwards compatibility 
handles everything.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // In case no initial commits, table schema is not really 
predefined.
+            // It means are effectively having 2 concurrent txn trying to 
define table schema
+            // differently in this case.
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into the same schema does not 
conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into different schemas conflicts.
+            // from clustering operation, instead of 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  /**
+   * Injects a new instant with customizable schema in commit metadata
+   * @param timestamp Instant timestamp
+   * @param schemaAttrValue Schema value to set in commit metadata (can be 
null)
+   */
+  public void injectInstantWithSchema(
+      String timestamp,
+      String action,
+      String schemaAttrValue) throws Exception {
+    HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+    instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Option.empty(),
+        WriteOperationType.UNKNOWN,
+        schemaAttrValue,
+        action)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentAlterSchemaTestDimension")
+  void testHoodieClientWithSchemaConflictResolution(
+      boolean createInitialCommit,
+      boolean createEmptyInitialCommit,
+      HoodieTableType tableType,
+      String writerSchema1,
+      String writerSchema2,
+      boolean shouldConflict,
+      String expectedTableSchemaAfterResolution) throws Exception {
+    if (tableType.equals(MERGE_ON_READ)) {
+      setUpMORTestTable();
+    }
+
+    Properties properties = new Properties();
+    properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(), 
String.valueOf(0));
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+    HoodieWriteConfig.Builder writeConfigBuilder = 
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withAutoCommit(false)
+        .withProperties(properties);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+    int totalCommits = 0;
+    final String nextCommitTime11 = "0011";
+    final String nextCommitTime12 = "0012";
+    if (createInitialCommit) {
+      // Create the first commit ingesting some data.
+      createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11, 
100, true);
+      createCommitWithUpserts(writeConfig, client1, nextCommitTime11, 
Option.empty(), nextCommitTime12, 100);
+      totalCommits += 2;
+    }
+
+    if (createEmptyInitialCommit) {
+      // Create an empty commit which does not contain a valid schema, schema 
conflict resolution should still be able
+      // to handle it properly. This can happen in delta streamer where no 
data is ingested but empty commit is created
+      // to save the checkpoint.
+      HoodieWriteConfig writeConfig22 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+      writeConfig22.setSchema("\"null\"");
+      final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+      JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+      // Perform upsert with empty RDD
+      client22.startCommitWithTime("0013");
+      JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+      client22.commit("0013", writeStatusRDD);
+      totalCommits += 1;
+
+      // Validate table schema in the end.
+      TableSchemaResolver r = new TableSchemaResolver(metaClient);
+      // Assert no table schema is defined.
+      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableAvroSchema(false));
+    }
+
+    // Start txn 002 altering table schema
+    HoodieWriteConfig writeConfig2 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig2.setSchema(writerSchema1);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+    final String nextCommitTime21 = "0021";
+    startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21, 
tableType);
+
+    // Start concurrent txn 003 alter table schema
+    HoodieWriteConfig writeConfig3 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig3.setSchema(writerSchema2);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+    final String nextCommitTime31 = "0031";
+    startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31, 
tableType);
+
+    Properties props = new TypedProperties();
+    props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+    HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+        .withProperties(props)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(true)

Review Comment:
   Should this be `withScheduleInlineClustering` as you want to schedule the 
clustering without executing it in this test for validating the schema conflict 
resolution with async clustering?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -1081,11 +1327,23 @@ private JavaRDD<WriteStatus> 
createCommitWithInserts(HoodieWriteConfig cfg, Spar
     return result;
   }
 
+  private static void startSchemaEvolutionTransaction(HoodieTableMetaClient 
metaClient, SparkRDDWriteClient client, String nextCommitTime2, HoodieTableType 
tableType) throws IOException {
+    String commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.UPSERT, tableType);
+    client.startCommitWithTime(nextCommitTime2, commitActionType);
+    client.preWrite(nextCommitTime2, WriteOperationType.UPSERT, 
client.createMetaClient(true));
+    HoodieInstant requested = 
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType, 
nextCommitTime2);
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.setOperationType(WriteOperationType.UPSERT);
+    
client.createMetaClient(true).getActiveTimeline().transitionRequestedToInflight(
+        requested, 
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));

Review Comment:
   The timeline in table version 8 (default) uses Avro to serialize the commit 
metadata.  Is that changed here or the test utils are outdated?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]> 
providerClassResolutionStrategyAndTableType()
     List<Object[]> opts = new ArrayList<>();
     for (Object providerClass : LOCK_PROVIDER_CLASSES) {
       for (ConflictResolutionStrategy resolutionStrategy : 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
-        opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, 
resolutionStrategy});
-        opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {COPY_ON_WRITE, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {MERGE_ON_READ, providerClass, 
resolutionStrategy});
       }
     }
     return opts;
   }
 
+  public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+    Object[][] data =
+        new Object[][] {
+            // First element set to true means before testing anything, we 
make a commit
+            // with schema TRIP_EXAMPLE_SCHEMA.
+            // {<should create initial commit>, <table type>, <txn 1 writer 
schema>, <txn 2 writer schema>,
+            // <should schema conflict>, <expected table schema after 
resolution>}
+            // --------------|-----read write-----|validate & 
commit|------------------------- txn 1
+            // --------------------------------|------read 
write------|--validate & commit--|- txn 2
+            // 
committed->|------------------------------------------------------------------- 
initial commit (optional)
+
+            // No schema evolution, no conflict.
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            // No concurrent schema evolution, no conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // If there is a initial commits defining table schema to 
TRIP_EXAMPLE_SCHEMA.
+            // as long as txn 2 stick to that schema, backwards compatibility 
handles everything.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // In case no initial commits, table schema is not really 
predefined.
+            // It means are effectively having 2 concurrent txn trying to 
define table schema
+            // differently in this case.
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into the same schema does not 
conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into different schemas conflicts.
+            // from clustering operation, instead of 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  /**
+   * Injects a new instant with customizable schema in commit metadata
+   * @param timestamp Instant timestamp
+   * @param schemaAttrValue Schema value to set in commit metadata (can be 
null)
+   */
+  public void injectInstantWithSchema(
+      String timestamp,
+      String action,
+      String schemaAttrValue) throws Exception {
+    HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+    instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Option.empty(),
+        WriteOperationType.UNKNOWN,
+        schemaAttrValue,
+        action)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentAlterSchemaTestDimension")
+  void testHoodieClientWithSchemaConflictResolution(
+      boolean createInitialCommit,
+      boolean createEmptyInitialCommit,
+      HoodieTableType tableType,
+      String writerSchema1,
+      String writerSchema2,
+      boolean shouldConflict,
+      String expectedTableSchemaAfterResolution) throws Exception {
+    if (tableType.equals(MERGE_ON_READ)) {
+      setUpMORTestTable();
+    }
+
+    Properties properties = new Properties();
+    properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(), 
String.valueOf(0));
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+    HoodieWriteConfig.Builder writeConfigBuilder = 
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withAutoCommit(false)
+        .withProperties(properties);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+    int totalCommits = 0;
+    final String nextCommitTime11 = "0011";
+    final String nextCommitTime12 = "0012";
+    if (createInitialCommit) {
+      // Create the first commit ingesting some data.
+      createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11, 
100, true);
+      createCommitWithUpserts(writeConfig, client1, nextCommitTime11, 
Option.empty(), nextCommitTime12, 100);
+      totalCommits += 2;
+    }
+
+    if (createEmptyInitialCommit) {
+      // Create an empty commit which does not contain a valid schema, schema 
conflict resolution should still be able
+      // to handle it properly. This can happen in delta streamer where no 
data is ingested but empty commit is created
+      // to save the checkpoint.
+      HoodieWriteConfig writeConfig22 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+      writeConfig22.setSchema("\"null\"");
+      final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+      JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+      // Perform upsert with empty RDD
+      client22.startCommitWithTime("0013");
+      JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+      client22.commit("0013", writeStatusRDD);
+      totalCommits += 1;
+
+      // Validate table schema in the end.
+      TableSchemaResolver r = new TableSchemaResolver(metaClient);
+      // Assert no table schema is defined.
+      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableAvroSchema(false));
+    }
+
+    // Start txn 002 altering table schema
+    HoodieWriteConfig writeConfig2 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig2.setSchema(writerSchema1);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+    final String nextCommitTime21 = "0021";
+    startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21, 
tableType);
+
+    // Start concurrent txn 003 alter table schema
+    HoodieWriteConfig writeConfig3 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig3.setSchema(writerSchema2);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+    final String nextCommitTime31 = "0031";
+    startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31, 
tableType);
+
+    Properties props = new TypedProperties();
+    props.setProperty("hoodie.datasource.write.row.writer.enable", "false");

Review Comment:
   Should we add a TODO to address the issue and avoid setting this config?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]> 
providerClassResolutionStrategyAndTableType()
     List<Object[]> opts = new ArrayList<>();
     for (Object providerClass : LOCK_PROVIDER_CLASSES) {
       for (ConflictResolutionStrategy resolutionStrategy : 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
-        opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, 
resolutionStrategy});
-        opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {COPY_ON_WRITE, providerClass, 
resolutionStrategy});
+        opts.add(new Object[] {MERGE_ON_READ, providerClass, 
resolutionStrategy});
       }
     }
     return opts;
   }
 
+  public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+    Object[][] data =
+        new Object[][] {
+            // First element set to true means before testing anything, we 
make a commit
+            // with schema TRIP_EXAMPLE_SCHEMA.
+            // {<should create initial commit>, <table type>, <txn 1 writer 
schema>, <txn 2 writer schema>,
+            // <should schema conflict>, <expected table schema after 
resolution>}
+            // --------------|-----read write-----|validate & 
commit|------------------------- txn 1
+            // --------------------------------|------read 
write------|--validate & commit--|- txn 2
+            // 
committed->|------------------------------------------------------------------- 
initial commit (optional)
+
+            // No schema evolution, no conflict.
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+            // No concurrent schema evolution, no conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // If there is a initial commits defining table schema to 
TRIP_EXAMPLE_SCHEMA.
+            // as long as txn 2 stick to that schema, backwards compatibility 
handles everything.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // In case no initial commits, table schema is not really 
predefined.
+            // It means are effectively having 2 concurrent txn trying to 
define table schema
+            // differently in this case.
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into the same schema does not 
conflict.
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            // Concurrent schema evolution into different schemas conflicts.
+            // from clustering operation, instead of 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+            {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+            {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  /**
+   * Injects a new instant with customizable schema in commit metadata
+   * @param timestamp Instant timestamp
+   * @param schemaAttrValue Schema value to set in commit metadata (can be 
null)
+   */
+  public void injectInstantWithSchema(
+      String timestamp,
+      String action,
+      String schemaAttrValue) throws Exception {
+    HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+    instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Option.empty(),
+        WriteOperationType.UNKNOWN,
+        schemaAttrValue,
+        action)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentAlterSchemaTestDimension")
+  void testHoodieClientWithSchemaConflictResolution(
+      boolean createInitialCommit,
+      boolean createEmptyInitialCommit,
+      HoodieTableType tableType,
+      String writerSchema1,
+      String writerSchema2,
+      boolean shouldConflict,
+      String expectedTableSchemaAfterResolution) throws Exception {
+    if (tableType.equals(MERGE_ON_READ)) {
+      setUpMORTestTable();
+    }
+
+    Properties properties = new Properties();
+    properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(), 
String.valueOf(0));
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+    HoodieWriteConfig.Builder writeConfigBuilder = 
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+        .withAutoCommit(false)
+        .withProperties(properties);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+    int totalCommits = 0;
+    final String nextCommitTime11 = "0011";
+    final String nextCommitTime12 = "0012";
+    if (createInitialCommit) {
+      // Create the first commit ingesting some data.
+      createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11, 
100, true);
+      createCommitWithUpserts(writeConfig, client1, nextCommitTime11, 
Option.empty(), nextCommitTime12, 100);
+      totalCommits += 2;
+    }
+
+    if (createEmptyInitialCommit) {
+      // Create an empty commit which does not contain a valid schema, schema 
conflict resolution should still be able
+      // to handle it properly. This can happen in delta streamer where no 
data is ingested but empty commit is created
+      // to save the checkpoint.
+      HoodieWriteConfig writeConfig22 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+      writeConfig22.setSchema("\"null\"");
+      final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+      JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+      // Perform upsert with empty RDD
+      client22.startCommitWithTime("0013");
+      JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+      client22.commit("0013", writeStatusRDD);
+      totalCommits += 1;
+
+      // Validate table schema in the end.
+      TableSchemaResolver r = new TableSchemaResolver(metaClient);
+      // Assert no table schema is defined.
+      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableAvroSchema(false));
+    }
+
+    // Start txn 002 altering table schema
+    HoodieWriteConfig writeConfig2 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig2.setSchema(writerSchema1);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+    final String nextCommitTime21 = "0021";
+    startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21, 
tableType);
+
+    // Start concurrent txn 003 alter table schema
+    HoodieWriteConfig writeConfig3 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+    writeConfig3.setSchema(writerSchema2);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+    final String nextCommitTime31 = "0031";
+    startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31, 
tableType);
+
+    Properties props = new TypedProperties();
+    props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+    HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+        .withProperties(props)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(true)

Review Comment:
   Should this be `false` as the test intends to run clustering only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to