Davis-Zhang-Onehouse commented on code in PR #12781:
URL: https://github.com/apache/hudi/pull/12781#discussion_r1964569457
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java:
##########
@@ -167,6 +167,14 @@ protected void initMetaClient(boolean preTableVersion8)
throws IOException {
preTableVersion8 ? Option.of(HoodieTableVersion.SIX) :
Option.of(HoodieTableVersion.current()));
}
+ protected void initMetaClient(boolean preTableVersion8, HoodieTableType
tableType) throws IOException {
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -298,12 +296,17 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>>
executeClustering(HoodieC
// Disable auto commit. Strategy is only expected to write data in new
files.
config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE,
Boolean.FALSE.toString());
- final Schema schema = new
TableSchemaResolver(table.getMetaClient()).getTableAvroSchemaForClustering(false).get();
+ Option<Schema> schema;
+ try {
+ schema = new
TableSchemaGetter(table.getMetaClient()).getTableAvroSchemaIfPresent(false);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
Review Comment:
changed to table schema resolver
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -140,8 +140,14 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
+ "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},"
+
"{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
+ public static final String EXTRA_COL_SCHEMA1 = "{\"name\":
\"extra_column1\", \"type\": [\"null\", \"string\"], \"default\": null },";
+ public static final String EXTRA_COL_SCHEMA2 = "{\"name\":
\"extra_column2\", \"type\": [\"null\", \"string\"], \"default\": null},";
Review Comment:
hudi does not support non null default value well, there are a few caveats.
We prefer just default to null or no default value. Can walk you through why
offline
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]>
providerClassResolutionStrategyAndTableType()
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy :
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
- opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass,
resolutionStrategy});
- opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {COPY_ON_WRITE, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {MERGE_ON_READ, providerClass,
resolutionStrategy});
}
}
return opts;
}
+ public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+ Object[][] data =
+ new Object[][] {
+ // First element set to true means before testing anything, we
make a commit
+ // with schema TRIP_EXAMPLE_SCHEMA.
+ // {<should create initial commit>, <table type>, <txn 1 writer
schema>, <txn 2 writer schema>,
+ // <should schema conflict>, <expected table schema after
resolution>}
+ // --------------|-----read write-----|validate &
commit|------------------------- txn 1
+ // --------------------------------|------read
write------|--validate & commit--|- txn 2
+ //
committed->|-------------------------------------------------------------------
initial commit (optional)
+
+ // No schema evolution, no conflict.
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ // No concurrent schema evolution, no conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // If there is a initial commits defining table schema to
TRIP_EXAMPLE_SCHEMA.
+ // as long as txn 2 stick to that schema, backwards compatibility
handles everything.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // In case no initial commits, table schema is not really
predefined.
+ // It means are effectively having 2 concurrent txn trying to
define table schema
+ // differently in this case.
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into the same schema does not
conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into different schemas conflicts.
+ // from clustering operation, instead of
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ /**
+ * Injects a new instant with customizable schema in commit metadata
+ * @param timestamp Instant timestamp
+ * @param schemaAttrValue Schema value to set in commit metadata (can be
null)
+ */
+ public void injectInstantWithSchema(
+ String timestamp,
+ String action,
+ String schemaAttrValue) throws Exception {
+ HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+ instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Option.empty(),
+ WriteOperationType.UNKNOWN,
+ schemaAttrValue,
+ action)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentAlterSchemaTestDimension")
+ void testHoodieClientWithSchemaConflictResolution(
+ boolean createInitialCommit,
+ boolean createEmptyInitialCommit,
+ HoodieTableType tableType,
+ String writerSchema1,
+ String writerSchema2,
+ boolean shouldConflict,
+ String expectedTableSchemaAfterResolution) throws Exception {
+ if (tableType.equals(MERGE_ON_READ)) {
+ setUpMORTestTable();
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(),
String.valueOf(0));
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+ properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+ HoodieWriteConfig.Builder writeConfigBuilder =
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withAutoCommit(false)
+ .withProperties(properties);
+ HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+ int totalCommits = 0;
+ final String nextCommitTime11 = "0011";
+ final String nextCommitTime12 = "0012";
+ if (createInitialCommit) {
+ // Create the first commit ingesting some data.
+ createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11,
100, true);
+ createCommitWithUpserts(writeConfig, client1, nextCommitTime11,
Option.empty(), nextCommitTime12, 100);
+ totalCommits += 2;
+ }
+
+ if (createEmptyInitialCommit) {
+ // Create an empty commit which does not contain a valid schema, schema
conflict resolution should still be able
+ // to handle it properly. This can happen in delta streamer where no
data is ingested but empty commit is created
+ // to save the checkpoint.
+ HoodieWriteConfig writeConfig22 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig22.setSchema("\"null\"");
+ final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+ JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+ // Perform upsert with empty RDD
+ client22.startCommitWithTime("0013");
+ JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+ client22.commit("0013", writeStatusRDD);
+ totalCommits += 1;
+
+ // Validate table schema in the end.
+ TableSchemaResolver r = new TableSchemaResolver(metaClient);
+ // Assert no table schema is defined.
+ assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableAvroSchema(false));
+ }
+
+ // Start txn 002 altering table schema
+ HoodieWriteConfig writeConfig2 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig2.setSchema(writerSchema1);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+ final String nextCommitTime21 = "0021";
+ startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21,
tableType);
+
+ // Start concurrent txn 003 alter table schema
+ HoodieWriteConfig writeConfig3 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig3.setSchema(writerSchema2);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+ final String nextCommitTime31 = "0031";
+ startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31,
tableType);
+
+ Properties props = new TypedProperties();
+ props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+ HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+ .withProperties(props)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(true)
Review Comment:
oh yeah, removed
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]>
providerClassResolutionStrategyAndTableType()
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy :
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
- opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass,
resolutionStrategy});
- opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {COPY_ON_WRITE, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {MERGE_ON_READ, providerClass,
resolutionStrategy});
}
}
return opts;
}
+ public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+ Object[][] data =
+ new Object[][] {
+ // First element set to true means before testing anything, we
make a commit
+ // with schema TRIP_EXAMPLE_SCHEMA.
+ // {<should create initial commit>, <table type>, <txn 1 writer
schema>, <txn 2 writer schema>,
+ // <should schema conflict>, <expected table schema after
resolution>}
+ // --------------|-----read write-----|validate &
commit|------------------------- txn 1
+ // --------------------------------|------read
write------|--validate & commit--|- txn 2
+ //
committed->|-------------------------------------------------------------------
initial commit (optional)
+
+ // No schema evolution, no conflict.
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ // No concurrent schema evolution, no conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // If there is a initial commits defining table schema to
TRIP_EXAMPLE_SCHEMA.
+ // as long as txn 2 stick to that schema, backwards compatibility
handles everything.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // In case no initial commits, table schema is not really
predefined.
+ // It means are effectively having 2 concurrent txn trying to
define table schema
+ // differently in this case.
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into the same schema does not
conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into different schemas conflicts.
+ // from clustering operation, instead of
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ /**
+ * Injects a new instant with customizable schema in commit metadata
+ * @param timestamp Instant timestamp
+ * @param schemaAttrValue Schema value to set in commit metadata (can be
null)
+ */
+ public void injectInstantWithSchema(
+ String timestamp,
+ String action,
+ String schemaAttrValue) throws Exception {
+ HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+ instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Option.empty(),
+ WriteOperationType.UNKNOWN,
+ schemaAttrValue,
+ action)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentAlterSchemaTestDimension")
+ void testHoodieClientWithSchemaConflictResolution(
+ boolean createInitialCommit,
+ boolean createEmptyInitialCommit,
+ HoodieTableType tableType,
+ String writerSchema1,
+ String writerSchema2,
+ boolean shouldConflict,
+ String expectedTableSchemaAfterResolution) throws Exception {
+ if (tableType.equals(MERGE_ON_READ)) {
+ setUpMORTestTable();
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(),
String.valueOf(0));
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+ properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+ HoodieWriteConfig.Builder writeConfigBuilder =
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withAutoCommit(false)
+ .withProperties(properties);
+ HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+ int totalCommits = 0;
+ final String nextCommitTime11 = "0011";
+ final String nextCommitTime12 = "0012";
+ if (createInitialCommit) {
+ // Create the first commit ingesting some data.
+ createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11,
100, true);
+ createCommitWithUpserts(writeConfig, client1, nextCommitTime11,
Option.empty(), nextCommitTime12, 100);
+ totalCommits += 2;
+ }
+
+ if (createEmptyInitialCommit) {
+ // Create an empty commit which does not contain a valid schema, schema
conflict resolution should still be able
+ // to handle it properly. This can happen in delta streamer where no
data is ingested but empty commit is created
+ // to save the checkpoint.
+ HoodieWriteConfig writeConfig22 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig22.setSchema("\"null\"");
+ final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+ JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+ // Perform upsert with empty RDD
+ client22.startCommitWithTime("0013");
+ JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+ client22.commit("0013", writeStatusRDD);
+ totalCommits += 1;
+
+ // Validate table schema in the end.
+ TableSchemaResolver r = new TableSchemaResolver(metaClient);
+ // Assert no table schema is defined.
+ assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableAvroSchema(false));
+ }
+
+ // Start txn 002 altering table schema
+ HoodieWriteConfig writeConfig2 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig2.setSchema(writerSchema1);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+ final String nextCommitTime21 = "0021";
+ startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21,
tableType);
+
+ // Start concurrent txn 003 alter table schema
+ HoodieWriteConfig writeConfig3 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig3.setSchema(writerSchema2);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+ final String nextCommitTime31 = "0031";
+ startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31,
tableType);
+
+ Properties props = new TypedProperties();
+ props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
Review Comment:
removed and no issue with the test
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -300,6 +300,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Schema string representing the latest schema of the
table. Hudi passes this to "
+ "implementations of evolution of schema");
+ public static final ConfigProperty<Boolean>
ENABLE_SCHEMA_CONFLICT_RESOLUTION = ConfigProperty
+ .key(CONCURRENCY_PREFIX + "schema.conflict.resolution.enable")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.0.2")
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -185,43 +176,32 @@ public Option<Schema> getTableAvroSchemaIfPresent(boolean
includeMetadataFields)
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}
- Option<Schema> getTableAvroSchemaInternal(boolean includeMetadataFields,
Option<HoodieInstant> instantOpt) {
- return (instantOpt.isPresent()
- ? getTableSchemaFromCommitMetadata(instantOpt.get(),
includeMetadataFields)
- : getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
- .or(() -> getTableCreateSchemaWithMetadata(includeMetadataFields))
- .or(() -> getSchemaFromDataFileIfPresent(includeMetadataFields))
- .map(this::handlePartitionColumnsIfNeeded);
- }
-
- /**
- * Retrieves the table creation schema with metadata fields and partition
columns handled.
- *
- * @param includeMetadataFields whether to include metadata fields in the
schema
- * @return Option containing the fully processed schema if available, empty
Option otherwise
- */
- public Option<Schema> getTableCreateSchemaWithMetadata(boolean
includeMetadataFields) {
- return metaClient.getTableConfig().getTableCreateSchema()
- .map(tableSchema ->
- includeMetadataFields
- ? HoodieAvroUtils.addMetadataFields(tableSchema,
hasOperationField.get())
- : tableSchema)
- .map(this::handlePartitionColumnsIfNeeded);
- }
-
- /**
- * Handles partition column logic for a given schema.
- *
- * @param schema the input schema to process
- * @return the processed schema with partition columns handled appropriately
- */
- private Schema handlePartitionColumnsIfNeeded(Schema schema) {
- if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
+ private Option<Schema> getTableAvroSchemaInternal(boolean
includeMetadataFields, Option<HoodieInstant> instantOpt) {
Review Comment:
yes, as we discussed, no changes to the old table schema resolver
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java:
##########
@@ -53,6 +54,12 @@ public HoodieReplaceCommitMetadata(boolean compacted) {
partitionToReplaceFileIds = new HashMap<>();
}
+ @VisibleForTesting
+ public HoodieReplaceCommitMetadata(HoodieCommitMetadata metadata) {
+ super(metadata);
+ partitionToReplaceFileIds = new HashMap<>();
+ }
Review Comment:
this is reverted in the latest commit, when I checkout the branch I could
not find this change.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##########
@@ -3829,7 +3829,7 @@ protected HoodieTableType getTableType() {
* TODO: Fix this and increase test coverage to include clustering via row
writers
* @return
*/
- private static Properties getDisabledRowWriterProperties() {
+ public static Properties getDisabledRowWriterProperties() {
Review Comment:
reverted
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -157,6 +158,19 @@ protected HoodieTable(HoodieWriteConfig config,
HoodieEngineContext context, Hoo
this.taskContextSupplier = context.getTaskContextSupplier();
}
+ @VisibleForTesting
+ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context,
HoodieTableMetaClient metaClient, FileSystemViewManager viewManager,
TaskContextSupplier supplier) {
Review Comment:
we can, and we need to add a lot of extra mocks for viewManager and
supplier. It does not help with test but make it unnecessarily complex on code
path we don't care at all.
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java:
##########
@@ -372,7 +372,8 @@ public void testGetCommitsTimelineAfter() throws
IOException {
private HoodieTableMetaClient prepareMetaClient(
List<HoodieInstant> activeInstants,
List<HoodieInstant> archivedInstants,
- String startTs) throws IOException {
+ String startTs
+ ) throws IOException {
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -1081,11 +1327,23 @@ private JavaRDD<WriteStatus>
createCommitWithInserts(HoodieWriteConfig cfg, Spar
return result;
}
+ private static void startSchemaEvolutionTransaction(HoodieTableMetaClient
metaClient, SparkRDDWriteClient client, String nextCommitTime2, HoodieTableType
tableType) throws IOException {
+ String commitActionType =
CommitUtils.getCommitActionType(WriteOperationType.UPSERT, tableType);
+ client.startCommitWithTime(nextCommitTime2, commitActionType);
+ client.preWrite(nextCommitTime2, WriteOperationType.UPSERT,
client.createMetaClient(true));
+ HoodieInstant requested =
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType,
nextCommitTime2);
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.setOperationType(WriteOperationType.UPSERT);
+
client.createMetaClient(true).getActiveTimeline().transitionRequestedToInflight(
+ requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Review Comment:
revised to the updated API
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -168,13 +189,238 @@ private static Iterable<Object[]>
providerClassResolutionStrategyAndTableType()
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
for (ConflictResolutionStrategy resolutionStrategy :
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
- opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass,
resolutionStrategy});
- opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {COPY_ON_WRITE, providerClass,
resolutionStrategy});
+ opts.add(new Object[] {MERGE_ON_READ, providerClass,
resolutionStrategy});
}
}
return opts;
}
+ public static Stream<Arguments> concurrentAlterSchemaTestDimension() {
+ Object[][] data =
+ new Object[][] {
+ // First element set to true means before testing anything, we
make a commit
+ // with schema TRIP_EXAMPLE_SCHEMA.
+ // {<should create initial commit>, <table type>, <txn 1 writer
schema>, <txn 2 writer schema>,
+ // <should schema conflict>, <expected table schema after
resolution>}
+ // --------------|-----read write-----|validate &
commit|------------------------- txn 1
+ // --------------------------------|------read
write------|--validate & commit--|- txn 2
+ //
committed->|-------------------------------------------------------------------
initial commit (optional)
+
+ // No schema evolution, no conflict.
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA},
+ // No concurrent schema evolution, no conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // If there is a initial commits defining table schema to
TRIP_EXAMPLE_SCHEMA.
+ // as long as txn 2 stick to that schema, backwards compatibility
handles everything.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // In case no initial commits, table schema is not really
predefined.
+ // It means are effectively having 2 concurrent txn trying to
define table schema
+ // differently in this case.
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into the same schema does not
conflict.
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ // Concurrent schema evolution into different schemas conflicts.
+ // from clustering operation, instead of
TRIP_EXAMPLE_SCHEMA_EVOLVED_1 (from commit 2)
+ {true, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {true, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, false, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, COPY_ON_WRITE, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_2, true, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ {false, true, MERGE_ON_READ, TRIP_EXAMPLE_SCHEMA_EVOLVED_1,
TRIP_EXAMPLE_SCHEMA_EVOLVED_1, false, TRIP_EXAMPLE_SCHEMA_EVOLVED_1},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ /**
+ * Injects a new instant with customizable schema in commit metadata
+ * @param timestamp Instant timestamp
+ * @param schemaAttrValue Schema value to set in commit metadata (can be
null)
+ */
+ public void injectInstantWithSchema(
+ String timestamp,
+ String action,
+ String schemaAttrValue) throws Exception {
+ HoodieTestTable instantGenerator = HoodieTestTable.of(metaClient);
+ instantGenerator.addCommit(timestamp, Option.of(buildMetadata(
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Option.empty(),
+ WriteOperationType.UNKNOWN,
+ schemaAttrValue,
+ action)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentAlterSchemaTestDimension")
+ void testHoodieClientWithSchemaConflictResolution(
+ boolean createInitialCommit,
+ boolean createEmptyInitialCommit,
+ HoodieTableType tableType,
+ String writerSchema1,
+ String writerSchema2,
+ boolean shouldConflict,
+ String expectedTableSchemaAfterResolution) throws Exception {
+ if (tableType.equals(MERGE_ON_READ)) {
+ setUpMORTestTable();
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key(),
String.valueOf(0));
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+ properties.setProperty(ENABLE_SCHEMA_CONFLICT_RESOLUTION.key(), "true");
+
+ HoodieWriteConfig.Builder writeConfigBuilder =
getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withAutoCommit(false)
+ .withProperties(properties);
+ HoodieWriteConfig writeConfig = writeConfigBuilder.build();
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+
+ int totalCommits = 0;
+ final String nextCommitTime11 = "0011";
+ final String nextCommitTime12 = "0012";
+ if (createInitialCommit) {
+ // Create the first commit ingesting some data.
+ createCommitWithInserts(writeConfig, client1, "000", nextCommitTime11,
100, true);
+ createCommitWithUpserts(writeConfig, client1, nextCommitTime11,
Option.empty(), nextCommitTime12, 100);
+ totalCommits += 2;
+ }
+
+ if (createEmptyInitialCommit) {
+ // Create an empty commit which does not contain a valid schema, schema
conflict resolution should still be able
+ // to handle it properly. This can happen in delta streamer where no
data is ingested but empty commit is created
+ // to save the checkpoint.
+ HoodieWriteConfig writeConfig22 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig22.setSchema("\"null\"");
+ final SparkRDDWriteClient client22 = getHoodieWriteClient(writeConfig22);
+ JavaRDD<HoodieRecord> emptyRDD = jsc.emptyRDD();
+ // Perform upsert with empty RDD
+ client22.startCommitWithTime("0013");
+ JavaRDD<WriteStatus> writeStatusRDD = client22.upsert(emptyRDD, "0013");
+ client22.commit("0013", writeStatusRDD);
+ totalCommits += 1;
+
+ // Validate table schema in the end.
+ TableSchemaResolver r = new TableSchemaResolver(metaClient);
+ // Assert no table schema is defined.
+ assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableAvroSchema(false));
+ }
+
+ // Start txn 002 altering table schema
+ HoodieWriteConfig writeConfig2 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig2.setSchema(writerSchema1);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
+ final String nextCommitTime21 = "0021";
+ startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21,
tableType);
+
+ // Start concurrent txn 003 alter table schema
+ HoodieWriteConfig writeConfig3 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ writeConfig3.setSchema(writerSchema2);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
+ final String nextCommitTime31 = "0031";
+ startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31,
tableType);
+
+ Properties props = new TypedProperties();
+ props.setProperty("hoodie.datasource.write.row.writer.enable", "false");
+ HoodieWriteConfig clusterWriteCfg = writeConfigBuilder
+ .withProperties(props)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true)
Review Comment:
revised it to set clustering/compaction based on table type. for mor we want
to run compaction, also rename the variable to avoid confusion
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]