yihua commented on code in PR #12781:
URL: https://github.com/apache/hudi/pull/12781#discussion_r1964413907
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]>
providerClassResolutionStrategyAndTableType()
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy :
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
- opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass,
resolutionStrategy});
- opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {COPY_ON_WRITE, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {MERGE_ON_READ, providerClass,
resolutionStrategy});
}
}
return opts;
}
+ public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+ Object[][] data =
+ new Object[][] {
+ // First element set to true means before testing anything, we
make a commit
+ // with schema TRIP_EXAMPLE_SCHEMA.
+ // {<should create initial commit>, <table type>, <txn 1 writer
schema>, <txn 2 writer schema>,
+ // <should schema conflict>, <expected table schema after
resolution>}
+ // --------------|-----read write-----|validate &
commit|------------------------- txn 1
+ // --------------------------------|------read
write------|--validate & commit--|- txn 2
+ //
committed->|-------------------------------------------------------------------
initial commit (optional)
+
+ // No schema evolution, no conflict.
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ // No concurrent schema evolution, no conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // If there is a initial commits defining table schema to
TRIP_EXAMPLE_SCHEMA.
+ // as long as txn 2 stick to that schema, backwards compatibility
handles everything.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // In case no initial commits, table schema is not really
predefined.
+ // It means are effectively having 2 concurrent txn trying to
define table schema
+ // differently in this case.
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into the same schema does not
conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into different schemas conflicts.
+ // from clustering operation, instead of
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ /**
+ * Injects a new instant with customizable schema in commit metadata
+ * @param timestamp Instant timestamp
+ * @param schemaAttrValue Schema value to set in commit metadata (can be
null)
+ */
+ public void injectInstantWithSchema(
+ String timestamp,
+ String action,
+ String schemaAttrValue) throws Exception {
+ HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+ instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Option.empty(),
+ WriteOperationType.UNKNOWN,
+ schemaAttrValue,
+ action)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentAlterSchemaTestDimension")
+ void testHoodieClientWithSchemaConflictResolution(
+ boolean createInitialCommit,
+ boolean createEmptyInitialCommit,
+ HoodieTableType tableType,
+ String writerSchema1,
+ String writerSchema2,
+ boolean shouldConflict,
+ String expectedTableSchemaAfterResolution) throws Exception {
+ if (tableType.equals(MERGE_ON_READ)) {
+ setUpMORTestTable();
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(),
String.valueOf(0));
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+ properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+ HoodieWriteConfig.Builder writeConfigBuilder =
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withAutoCommit(false)
+ .withProperties(properties);
+ HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+ int totalCommits = 0;
+ final String nextCommitTime11 = "0011";
+ final String nextCommitTime12 = "0012";
+ if (createInitialCommit) {
+ // Create the first commit ingesting some data.
+ createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11,
100, true);
+ createCommitWithUpserts(writeConfig, client1, nextCommitTime11,
Option.empty(), nextCommitTime12, 100);
+ totalCommits += 2;
+ }
+
+ if (createEmptyInitialCommit) {
+ // Create an empty commit which does not contain a valid schema, schema
conflict resolution should still be able
+ // to handle it properly. This can happen in delta streamer where no
data is ingested but empty commit is created
+ // to save the checkpoint.
+ HoodieWriteConfig writeConfig22 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig22.setSchema("\"null\"");
+ final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+ JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+ // Perform upsert with empty RDD
+ client22.startCommitWithTime("0013");
+ JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+ client22.commit("0013", writeStatusRDD);
+ totalCommits += 1;
+
+ // Validate table schema in the end.
+ TableSchemaResolver r = new TableSchemaResolver(metaClient);
+ // Assert no table schema is defined.
+ assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableAvroSchema(false));
+ }
+
+ // Start txn 002 altering table schema
+ HoodieWriteConfig writeConfig2 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig2.setSchema(writerSchema1);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+ final String nextCommitTime21 = "0021";
+ startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21,
tableType);
+
+ // Start concurrent txn 003 alter table schema
+ HoodieWriteConfig writeConfig3 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig3.setSchema(writerSchema2);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+ final String nextCommitTime31 = "0031";
+ startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31,
tableType);
+
+ Properties props = new TypedProperties();
+ props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+ HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+ .withProperties(props)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(true)
Review Comment:
Should this be `withScheduleInlineClustering` as you want to schedule the
clustering without executing it in this test for validating the schema conflict
resolution with async clustering?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -1081,11 +1327,23 @@ private JavaRDD<WriteStatus>
createCommitWithInserts(HoodieWriteConfig cfg, Spar
return result;
}
+ private static void startSchemaEvolutionTransaction(HoodieTableMetaClient
metaClient, SparkRDDWriteClient client, String nextCommitTime2, HoodieTableType
tableType) throws IOException {
+ String commitActionType =
CommitUtils.getCommitActionType(WriteOperationType.UPSERT, tableType);
+ client.startCommitWithTime(nextCommitTime2, commitActionType);
+ client.preWrite(nextCommitTime2, WriteOperationType.UPSERT,
client.createMetaClient(true));
+ HoodieInstant requested =
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType,
nextCommitTime2);
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.setOperationType(WriteOperationType.UPSERT);
+
client.createMetaClient(true).getActiveTimeline().transitionRequestedToInflight(
+ requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Review Comment:
The timeline in table version 8 (default) uses Avro to serialize the commit
metadata. Is that changed here or the test utils are outdated?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]>
providerClassResolutionStrategyAndTableType()
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy :
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
- opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass,
resolutionStrategy});
- opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {COPY_ON_WRITE, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {MERGE_ON_READ, providerClass,
resolutionStrategy});
}
}
return opts;
}
+ public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+ Object[][] data =
+ new Object[][] {
+ // First element set to true means before testing anything, we
make a commit
+ // with schema TRIP_EXAMPLE_SCHEMA.
+ // {<should create initial commit>, <table type>, <txn 1 writer
schema>, <txn 2 writer schema>,
+ // <should schema conflict>, <expected table schema after
resolution>}
+ // --------------|-----read write-----|validate &
commit|------------------------- txn 1
+ // --------------------------------|------read
write------|--validate & commit--|- txn 2
+ //
committed->|-------------------------------------------------------------------
initial commit (optional)
+
+ // No schema evolution, no conflict.
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ // No concurrent schema evolution, no conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // If there is a initial commits defining table schema to
TRIP_EXAMPLE_SCHEMA.
+ // as long as txn 2 stick to that schema, backwards compatibility
handles everything.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // In case no initial commits, table schema is not really
predefined.
+ // It means are effectively having 2 concurrent txn trying to
define table schema
+ // differently in this case.
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into the same schema does not
conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into different schemas conflicts.
+ // from clustering operation, instead of
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ /**
+ * Injects a new instant with customizable schema in commit metadata
+ * @param timestamp Instant timestamp
+ * @param schemaAttrValue Schema value to set in commit metadata (can be
null)
+ */
+ public void injectInstantWithSchema(
+ String timestamp,
+ String action,
+ String schemaAttrValue) throws Exception {
+ HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+ instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Option.empty(),
+ WriteOperationType.UNKNOWN,
+ schemaAttrValue,
+ action)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentAlterSchemaTestDimension")
+ void testHoodieClientWithSchemaConflictResolution(
+ boolean createInitialCommit,
+ boolean createEmptyInitialCommit,
+ HoodieTableType tableType,
+ String writerSchema1,
+ String writerSchema2,
+ boolean shouldConflict,
+ String expectedTableSchemaAfterResolution) throws Exception {
+ if (tableType.equals(MERGE_ON_READ)) {
+ setUpMORTestTable();
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(),
String.valueOf(0));
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+ properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+ HoodieWriteConfig.Builder writeConfigBuilder =
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withAutoCommit(false)
+ .withProperties(properties);
+ HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+ int totalCommits = 0;
+ final String nextCommitTime11 = "0011";
+ final String nextCommitTime12 = "0012";
+ if (createInitialCommit) {
+ // Create the first commit ingesting some data.
+ createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11,
100, true);
+ createCommitWithUpserts(writeConfig, client1, nextCommitTime11,
Option.empty(), nextCommitTime12, 100);
+ totalCommits += 2;
+ }
+
+ if (createEmptyInitialCommit) {
+ // Create an empty commit which does not contain a valid schema, schema
conflict resolution should still be able
+ // to handle it properly. This can happen in delta streamer where no
data is ingested but empty commit is created
+ // to save the checkpoint.
+ HoodieWriteConfig writeConfig22 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig22.setSchema("\"null\"");
+ final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+ JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+ // Perform upsert with empty RDD
+ client22.startCommitWithTime("0013");
+ JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+ client22.commit("0013", writeStatusRDD);
+ totalCommits += 1;
+
+ // Validate table schema in the end.
+ TableSchemaResolver r = new TableSchemaResolver(metaClient);
+ // Assert no table schema is defined.
+ assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableAvroSchema(false));
+ }
+
+ // Start txn 002 altering table schema
+ HoodieWriteConfig writeConfig2 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig2.setSchema(writerSchema1);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+ final String nextCommitTime21 = "0021";
+ startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21,
tableType);
+
+ // Start concurrent txn 003 alter table schema
+ HoodieWriteConfig writeConfig3 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig3.setSchema(writerSchema2);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+ final String nextCommitTime31 = "0031";
+ startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31,
tableType);
+
+ Properties props = new TypedProperties();
+ props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
Review Comment:
Should we add a TODO to address the issue and avoid setting this config?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]>
providerClassResolutionStrategyAndTableType()
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy :
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
- opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass,
resolutionStrategy});
- opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {COPY_ON_WRITE, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {MERGE_ON_READ, providerClass,
resolutionStrategy});
}
}
return opts;
}
+ public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+ Object[][] data =
+ new Object[][] {
+ // First element set to true means before testing anything, we
make a commit
+ // with schema TRIP_EXAMPLE_SCHEMA.
+ // {<should create initial commit>, <table type>, <txn 1 writer
schema>, <txn 2 writer schema>,
+ // <should schema conflict>, <expected table schema after
resolution>}
+ // --------------|-----read write-----|validate &
commit|------------------------- txn 1
+ // --------------------------------|------read
write------|--validate & commit--|- txn 2
+ //
committed->|-------------------------------------------------------------------
initial commit (optional)
+
+ // No schema evolution, no conflict.
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ // No concurrent schema evolution, no conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // If there is a initial commits defining table schema to
TRIP_EXAMPLE_SCHEMA.
+ // as long as txn 2 stick to that schema, backwards compatibility
handles everything.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // In case no initial commits, table schema is not really
predefined.
+ // It means are effectively having 2 concurrent txn trying to
define table schema
+ // differently in this case.
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into the same schema does not
conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into different schemas conflicts.
+ // from clustering operation, instead of
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ /**
+ * Injects a new instant with customizable schema in commit metadata
+ * @param timestamp Instant timestamp
+ * @param schemaAttrValue Schema value to set in commit metadata (can be
null)
+ */
+ public void injectInstantWithSchema(
+ String timestamp,
+ String action,
+ String schemaAttrValue) throws Exception {
+ HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+ instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Option.empty(),
+ WriteOperationType.UNKNOWN,
+ schemaAttrValue,
+ action)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentAlterSchemaTestDimension")
+ void testHoodieClientWithSchemaConflictResolution(
+ boolean createInitialCommit,
+ boolean createEmptyInitialCommit,
+ HoodieTableType tableType,
+ String writerSchema1,
+ String writerSchema2,
+ boolean shouldConflict,
+ String expectedTableSchemaAfterResolution) throws Exception {
+ if (tableType.equals(MERGE_ON_READ)) {
+ setUpMORTestTable();
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(),
String.valueOf(0));
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+ properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+ HoodieWriteConfig.Builder writeConfigBuilder =
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withAutoCommit(false)
+ .withProperties(properties);
+ HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+ int totalCommits = 0;
+ final String nextCommitTime11 = "0011";
+ final String nextCommitTime12 = "0012";
+ if (createInitialCommit) {
+ // Create the first commit ingesting some data.
+ createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11,
100, true);
+ createCommitWithUpserts(writeConfig, client1, nextCommitTime11,
Option.empty(), nextCommitTime12, 100);
+ totalCommits += 2;
+ }
+
+ if (createEmptyInitialCommit) {
+ // Create an empty commit which does not contain a valid schema, schema
conflict resolution should still be able
+ // to handle it properly. This can happen in delta streamer where no
data is ingested but empty commit is created
+ // to save the checkpoint.
+ HoodieWriteConfig writeConfig22 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig22.setSchema("\"null\"");
+ final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+ JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+ // Perform upsert with empty RDD
+ client22.startCommitWithTime("0013");
+ JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+ client22.commit("0013", writeStatusRDD);
+ totalCommits += 1;
+
+ // Validate table schema in the end.
+ TableSchemaResolver r = new TableSchemaResolver(metaClient);
+ // Assert no table schema is defined.
+ assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableAvroSchema(false));
+ }
+
+ // Start txn 002 altering table schema
+ HoodieWriteConfig writeConfig2 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig2.setSchema(writerSchema1);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+ final String nextCommitTime21 = "0021";
+ startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21,
tableType);
+
+ // Start concurrent txn 003 alter table schema
+ HoodieWriteConfig writeConfig3 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig3.setSchema(writerSchema2);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+ final String nextCommitTime31 = "0031";
+ startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31,
tableType);
+
+ Properties props = new TypedProperties();
+ props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+ HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+ .withProperties(props)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true)
Review Comment:
Should this be `false` as the test intends to run clustering only?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]