This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8906b0dfeea [HUDI-5782] Tweak defaults and remove unnecessary configs
after config review (#8128)
8906b0dfeea is described below
commit 8906b0dfeea3decfbfd6c0645c67fac729c24cbb
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Apr 6 04:44:36 2023 +0530
[HUDI-5782] Tweak defaults and remove unnecessary configs after config
review (#8128)
This commit tweaks a few defaults and removes unnecessary configs after
config review:
"hoodie.write.lock.dynamodb.table": add default value of hudi_locks
"hoodie.write.lock.dynamodb.table_creation_timeout": default changed from
10 minutes to 2 minutes
"hoodie.clustering.preserve.commit.metadata": removed since this should
always be true
"hoodie.compaction.preserve.commit.metadata": removed since this should
always be true
"hoodie.write.lock.max_wait_time_ms_between_retry": default changed from 5s
to 16s
"hoodie.write.commit.callback.http.timeout.seconds": default changed from
3s to 30s
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/config/DynamoDbBasedLockConfig.java | 4 +--
.../apache/hudi/config/HoodieClusteringConfig.java | 11 -------
.../apache/hudi/config/HoodieCompactionConfig.java | 11 -------
.../org/apache/hudi/config/HoodieLockConfig.java | 2 +-
.../config/HoodieWriteCommitCallbackConfig.java | 4 +--
.../org/apache/hudi/config/HoodieWriteConfig.java | 9 +-----
.../org/apache/hudi/io/HoodieCreateHandle.java | 4 ++-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 4 ++-
.../metadata/HoodieBackedTableMetadataWriter.java | 3 --
.../PartitionAwareClusteringPlanStrategy.java | 2 +-
.../hudi/client/TestUpdateSchemaEvolution.java | 13 ++++----
.../TestHoodieClientOnCopyOnWriteStorage.java | 35 ++++++++--------------
.../hudi/client/functional/TestHoodieIndex.java | 2 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 9 +++---
...HoodieSparkMergeOnReadTableIncrementalRead.java | 4 +--
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 4 +--
.../SparkClientFunctionalTestHarness.java | 8 ++---
.../apache/hudi/common/util/ClusteringUtils.java | 1 +
.../apache/hudi/configuration/FlinkOptions.java | 12 ++------
.../hudi/sink/clustering/ClusteringOperator.java | 25 +++++-----------
.../src/test/java/HoodieJavaStreamingApp.java | 6 ++--
.../org/apache/hudi/functional/TestBootstrap.java | 4 +--
.../TestHoodieSparkMergeOnReadTableClustering.java | 28 +++++++----------
.../apache/hudi/functional/TestOrcBootstrap.java | 2 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 13 ++------
25 files changed, 73 insertions(+), 147 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
index 15e81bc90e3..4530340537b 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -46,7 +46,7 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
public static final ConfigProperty<String> DYNAMODB_LOCK_TABLE_NAME =
ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table")
- .noDefaultValue()
+ .defaultValue("hudi_locks")
.sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, the name of the
DynamoDB table acting as lock table");
@@ -98,7 +98,7 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
public static final ConfigProperty<String>
DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table_creation_timeout")
- .defaultValue(String.valueOf(10 * 60 * 1000))
+ .defaultValue(String.valueOf(2 * 60 * 1000))
.sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, the maximum number
of milliseconds to wait for creating DynamoDB table");
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index e04ce341e46..124d2444705 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -218,12 +218,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
.withDocumentation("Enable running of clustering service, asynchronously
as inserts happen on the table.")
.withAlternatives("hoodie.datasource.clustering.async.enable");
- public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA =
ConfigProperty
- .key("hoodie.clustering.preserve.commit.metadata")
- .defaultValue(true)
- .sinceVersion("0.9.0")
- .withDocumentation("When rewriting data, preserves existing
hoodie_commit_time");
-
/**
* @deprecated this setting has no effect. Please refer to clustering
configuration, as well as
* {@link #LAYOUT_OPTIMIZE_STRATEGY} config to enable advanced record layout
optimization strategies
@@ -581,11 +575,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
- public Builder withPreserveHoodieCommitMetadata(Boolean
preserveHoodieCommitMetadata) {
- clusteringConfig.setValue(PRESERVE_COMMIT_METADATA,
String.valueOf(preserveHoodieCommitMetadata));
- return this;
- }
-
public Builder withRollbackPendingClustering(Boolean
rollbackPendingClustering) {
clusteringConfig.setValue(ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT,
String.valueOf(rollbackPendingClustering));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index e37ff3c46bf..02d86463d80 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -151,12 +151,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Used by
org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the
number of "
+ "latest partitions to compact during a compaction run.");
- public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA =
ConfigProperty
- .key("hoodie.compaction.preserve.commit.metadata")
- .defaultValue(true)
- .sinceVersion("0.11.0")
- .withDocumentation("When rewriting data, preserves existing
hoodie_commit_time");
-
/**
* Configs related to specific table types.
*/
@@ -423,11 +417,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
- compactionConfig.setValue(PRESERVE_COMMIT_METADATA,
String.valueOf(preserveCommitMetadata));
- return this;
- }
-
public Builder withLogCompactionBlocksThreshold(String
logCompactionBlocksThreshold) {
compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD,
logCompactionBlocksThreshold);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index f91c5791a03..73ab98f7067 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -76,7 +76,7 @@ public class HoodieLockConfig extends HoodieConfig {
public static final ConfigProperty<String>
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS = ConfigProperty
.key(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY)
- .defaultValue(String.valueOf(5000L))
+ .defaultValue(String.valueOf(16000L))
.sinceVersion("0.8.0")
.withDocumentation("Maximum amount of time to wait between retries by
lock provider client. This bounds"
+ " the maximum delay from the exponential backoff. Currently used
by ZK based lock provider only.");
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
index 8be57c05d1c..d9e0193dc72 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
@@ -67,9 +67,9 @@ public class HoodieWriteCommitCallbackConfig extends
HoodieConfig {
public static final ConfigProperty<Integer> CALLBACK_HTTP_TIMEOUT_IN_SECONDS
= ConfigProperty
.key(CALLBACK_PREFIX + "http.timeout.seconds")
- .defaultValue(3)
+ .defaultValue(30)
.sinceVersion("0.6.0")
- .withDocumentation("Callback timeout in seconds. 3 by default");
+ .withDocumentation("Callback timeout in seconds.");
/**
* @deprecated Use {@link #TURN_CALLBACK_ON} and its methods instead
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c83792c22de..51539f585d9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1537,14 +1537,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
}
- public boolean isPreserveHoodieCommitMetadataForClustering() {
- return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA);
- }
-
- public boolean isPreserveHoodieCommitMetadataForCompaction() {
- return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA);
- }
-
public boolean isClusteringEnabled() {
// TODO: future support async clustering
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
@@ -2308,6 +2300,7 @@ public class HoodieWriteConfig extends HoodieConfig {
/**
* Hoodie Client Lock Configs.
+ *
* @return
*/
public boolean isAutoAdjustLockConfigs() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 23edb15a780..8b20df3f1a5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -35,6 +35,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -115,7 +116,8 @@ public class HoodieCreateHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>>
recordMap,
TaskContextSupplier taskContextSupplier) {
- this(config, instantTime, hoodieTable, partitionPath, fileId,
taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
+ // preserveMetadata is disabled by default for MDT but enabled otherwise
+ this(config, instantTime, hoodieTable, partitionPath, fileId,
taskContextSupplier,
!HoodieTableMetadata.isMetadataTable(config.getBasePath()));
this.recordMap = recordMap;
this.useWriterSchema = true;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4202e2c590b..073a0e0aad1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -47,6 +47,7 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -140,7 +141,8 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchemaForCompaction = true;
- this.preserveMetadata =
config.isPreserveHoodieCommitMetadataForCompaction();
+ // preserveMetadata is disabled by default for MDT but enabled otherwise
+ this.preserveMetadata =
!HoodieTableMetadata.isMetadataTable(config.getBasePath());
init(fileId, this.partitionPath, dataFileToBeMerged);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index e5065baeb82..9ba192d4a42 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -291,9 +291,6 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
- // by default, the HFile does not keep the metadata fields, set up
as false
- // to always use the metadata of the new record.
- .withPreserveCommitMetadata(false)
.withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
.build())
.withParallelism(parallelism, parallelism)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 8c0bd0cab1f..5b6ee9075bd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -122,7 +122,7 @@ public abstract class
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
-
.setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadataForClustering())
+ .setPreserveHoodieMetadata(true)
.build());
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 8e3252a8462..12106e011fd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -30,11 +30,11 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
@@ -80,7 +80,6 @@ public class TestUpdateSchemaEvolution extends
HoodieClientTestHarness implement
private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs)
throws IOException {
// Create a bunch of records with an old version of schema
final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleSchema.avsc");
- config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x
-> {
List<HoodieRecord> insertRecords = new ArrayList<>();
@@ -91,9 +90,11 @@ public class TestUpdateSchemaEvolution extends
HoodieClientTestHarness implement
}
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(),
Function.identity()));
- HoodieCreateHandle<?,?,?,?> createHandle =
- new HoodieCreateHandle(config, "100", table,
insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
- createHandle.write();
+ HoodieWriteHandle<?,?,?,?> createHandle = new
CreateHandleFactory<>(false)
+ .create(config, "100", table,
insertRecords.get(0).getPartitionPath(), "f1-0", supplier);
+ for (HoodieRecord record : insertRecordMap.values()) {
+ createHandle.write(record,
createHandle.getWriterSchemaWithMetaFields(),
createHandle.getConfig().getProps());
+ }
return createHandle.close().get(0);
}).collect();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 7f552b6da2d..222816ff772 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -199,15 +199,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
}
- private static Stream<Arguments>
populateMetaFieldsAndPreserveMetadataParams() {
- return Arrays.stream(new Boolean[][] {
- {true, true},
- {false, true},
- {true, false},
- {false, false}
- }).map(Arguments::of);
- }
-
private static Stream<Arguments> rollbackFailedCommitsParams() {
return Stream.of(
Arguments.of(HoodieFailedWritesCleaningPolicy.LAZY, true),
@@ -1040,8 +1031,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
- .withBloomIndexUpdatePartitionPath(true)
- .withGlobalSimpleIndexUpdatePartitionPath(true)
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
@@ -1518,12 +1507,12 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
@ParameterizedTest
- @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
- public void testSimpleClustering(boolean populateMetaFields, boolean
preserveCommitMetadata) throws Exception {
+ @MethodSource("populateMetaFieldsParams")
+ public void testSimpleClustering(boolean populateMetaFields) throws
Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+ .build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@@ -1567,7 +1556,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline()
throws Exception {
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
- .withPreserveHoodieCommitMetadata(true).build();
+ .build();
// trigger clustering, but do not complete
testInsertAndClustering(clusteringConfig, true, false, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
@@ -1606,7 +1595,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(false).withScheduleInlineClustering(scheduleInlineClustering)
- .withPreserveHoodieCommitMetadata(true).build();
+ .build();
HoodieWriteConfig config =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
.withClusteringConfig(clusteringConfig)
@@ -1633,26 +1622,26 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
@ParameterizedTest
- @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
- public void testClusteringWithSortColumns(boolean populateMetaFields,
boolean preserveCommitMetadata) throws Exception {
+ @MethodSource("populateMetaFieldsParams")
+ public void testClusteringWithSortColumns(boolean populateMetaFields) throws
Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" :
"_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+ .build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
- @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
- public void testClusteringWithSortOneFilePerGroup(boolean
populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+ @MethodSource("populateMetaFieldsParams")
+ public void testClusteringWithSortOneFilePerGroup(boolean
populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("begin_lat,begin_lon")
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+ .build();
// note that assertSameFileIds is true for this test because of the plan
and execution strategy
testInsertAndClustering(clusteringConfig, populateMetaFields, true, true,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@@ -1923,7 +1912,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
SparkRDDWriteClient client = getHoodieWriteClient(config);
String clusteringCommitTime =
client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata =
client.cluster(clusteringCommitTime, completeClustering);
- if (config.isPreserveHoodieCommitMetadataForClustering() &&
config.populateMetaFields()) {
+ if (config.populateMetaFields()) {
verifyRecordsWrittenWithPreservedMetadata(new
HashSet<>(allRecords.getRight()), allRecords.getLeft(),
clusterMetadata.getWriteStatuses().collect());
} else {
verifyRecordsWritten(clusteringCommitTime, populateMetaFields,
allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index d6d5b2495a3..7745998fd97 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -38,7 +39,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 26f2705a4ab..184c7880c9e 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -217,13 +217,12 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
}
// TODO: Enable metadata virtual keys in this test once the feature
HUDI-2593 is completed
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta)
throws Exception {
+ @Test
+ public void testLogFileCountsAfterCompaction() throws Exception {
boolean populateMetaFields = true;
// insert 100 records
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false,
HoodieIndex.IndexType.BLOOM,
- 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(),
preserveCommitMeta);
+ 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();
@@ -304,7 +303,7 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
List<Row> rows = actual.collectAsList();
assertEquals(updatedRecords.size(), rows.size());
for (Row row : rows) {
- assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD),
preserveCommitMeta ? newCommitTime : compactionInstantTime);
+ assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD),
newCommitTime);
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
index 275fd32ca7d..ddf458f9505 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
@@ -170,13 +170,13 @@ public class
TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
// verify new write shows up in snapshot mode after compaction is
complete
snapshotROFiles = getROSnapshotFiles(partitionPath);
- validateFiles(partitionPath,2, snapshotROFiles, false,
roSnapshotJobConf,400, commitTime1, compactionCommitTime,
+ validateFiles(partitionPath,2, snapshotROFiles, false,
roSnapshotJobConf,400, commitTime1, updateTime,
insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1,
true);
assertTrue(incrementalROFiles.length == 2);
// verify 006 shows up because of pending compaction
- validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf,
400, commitTime1, compactionCommitTime,
+ validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf,
400, commitTime1, updateTime,
insertsTime);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 73b1da95648..f061c152104 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -149,7 +149,7 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
HoodieWriteConfig cfg = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024 * 1024)
-
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
@@ -191,7 +191,7 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
HoodieWriteConfig cfg = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024 * 1024)
-
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 88d108879b4..511613d9044 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -360,20 +360,20 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit,
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
- return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM,
compactionSmallFileSize, clusteringConfig, false);
+ return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM,
compactionSmallFileSize, clusteringConfig);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit,
Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
- return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024
* 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false);
+ return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024
* 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit,
Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
- long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig,
boolean preserveCommitMetaForCompaction) {
+ long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return
HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2,
2)
.withDeleteParallelism(2)
.withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
-
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build())
+
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 *
1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 5d724ff9621..3de4f796e9b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -181,6 +181,7 @@ public class ClusteringUtils {
.setInputGroups(clusteringGroups)
.setExtraMetadata(extraMetadata)
.setStrategy(strategy)
+ .setPreserveHoodieMetadata(true)
.build();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 2d9243ab5b4..54ee03451cd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -31,8 +31,6 @@ import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
@@ -303,10 +301,7 @@ public class FlinkOptions extends HoodieConfig {
.booleanType()
.defaultValue(false)// default read as batch
.withDescription("Whether to skip compaction instants and avoid reading
compacted base files for streaming read to improve read performance.\n"
- + "There are two cases that this option can be used to avoid reading
duplicates:\n"
- + "1) you are definitely sure that the consumer reads [faster
than/completes before] any compaction instants "
- + "when " + HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key() +
" is set to false.\n"
- + "2) changelog mode is enabled, this option is a solution to keep
data integrity");
+ + "This option can be used to avoid reading duplicates when
changelog mode is enabled, it is a solution to keep data integrity\n");
// this option is experimental
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING =
ConfigOptions
@@ -314,10 +309,7 @@ public class FlinkOptions extends HoodieConfig {
.booleanType()
.defaultValue(false)
.withDescription("Whether to skip clustering instants to avoid
reading base files of clustering operations for streaming read "
- + "to improve read performance.\n"
- + "This option toggled to true to avoid duplicates when: \n"
- + "1) you are definitely sure that the consumer reads [faster
than/completes before] any clustering instants "
- + "when " +
HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n");
+ + "to improve read performance.");
public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index bf05e54117e..85a5626e3b8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -18,23 +18,21 @@
package org.apache.hudi.sink.clustering;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
@@ -54,7 +52,6 @@ import org.apache.hudi.util.FlinkWriteClients;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -63,8 +60,8 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -92,8 +89,6 @@ import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
-
/**
* Operator to execute the actual clustering task assigned by the clustering
plan task.
* In order to execute scalable, the input should shuffle by the clustering
event {@link ClusteringPlanEvent}.
@@ -103,7 +98,6 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
private static final Logger LOG =
LoggerFactory.getLogger(ClusteringOperator.class);
private final Configuration conf;
- private final boolean preserveHoodieMetadata;
private final RowType rowType;
private int taskID;
private transient HoodieWriteConfig writeConfig;
@@ -133,10 +127,7 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
public ClusteringOperator(Configuration conf, RowType rowType) {
this.conf = conf;
- this.preserveHoodieMetadata =
conf.getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(),
HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.defaultValue());
- this.rowType = this.preserveHoodieMetadata
- ? BulkInsertWriterHelper.addMetadataFields(rowType, false)
- : rowType;
+ this.rowType = BulkInsertWriterHelper.addMetadataFields(rowType, false);
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
@@ -161,7 +152,7 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
this.table = writeClient.getHoodieTable();
this.schema = AvroSchemaConverter.convertToSchema(rowType);
- this.readerSchema = this.preserveHoodieMetadata ? this.schema :
HoodieAvroUtils.addMetadataFields(this.schema);
+ this.readerSchema = this.schema;
this.requiredPos = getRequiredPositions();
this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(rowType);
@@ -228,7 +219,7 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
BulkInsertWriterHelper writerHelper = new
BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
instantTime, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
- this.rowType, this.preserveHoodieMetadata);
+ this.rowType, true);
List<ClusteringOperation> clusteringOps =
clusteringGroupInfo.getOperations();
boolean hasLogFiles = clusteringOps.stream().anyMatch(op ->
op.getDeltaFilePaths().size() > 0);
@@ -348,9 +339,7 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
* Transform IndexedRecord into HoodieRecord.
*/
private RowData transform(IndexedRecord indexedRecord) {
- GenericRecord record = this.preserveHoodieMetadata
- ? (GenericRecord) indexedRecord
- : buildAvroRecordBySchema(indexedRecord, schema, requiredPos, new
GenericRecordBuilder(schema));
+ GenericRecord record = (GenericRecord) indexedRecord;
return (RowData) avroToRowDataConverter.convert(record);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index ce5b7e739d2..8e5897d14e1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -302,7 +302,7 @@ public class HoodieJavaStreamingApp {
// wait for spark streaming to process one microbatch
Thread.sleep(3000);
waitTillNCommits(fs, numExpCommits, 180, 3);
- commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+ commitInstantTime2 = HoodieDataSourceHelpers.listCommitsSince(fs,
tablePath, commitInstantTime1).stream().sorted().findFirst().get();
LOG.info("Second commit at instant time :" + commitInstantTime2);
}
@@ -312,8 +312,7 @@ public class HoodieJavaStreamingApp {
}
// Wait for compaction to also finish and track latest timestamp as
commit timestamp
waitTillNCommits(fs, numExpCommits, 180, 3);
- commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
- LOG.info("Compaction commit at instant time :" + commitInstantTime2);
+ LOG.info("Compaction commit at instant time :" +
HoodieDataSourceHelpers.latestCommit(fs, tablePath));
}
/**
@@ -377,7 +376,6 @@ public class HoodieJavaStreamingApp {
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
- .option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false")
.option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH().key(),"true")
.option(HoodieWriteConfig.TBL_NAME.key(),
tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index be93171adc2..63e2f7f58c1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -58,8 +58,8 @@ import org.apache.hudi.io.storage.HoodieAvroParquetReader;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.table.action.bootstrap.BootstrapUtils;
-import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -305,7 +305,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
client.compact(compactionInstant.get());
checkBootstrapResults(totalRecords, schema, compactionInstant.get(),
checkNumRawFiles,
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp,
!deltaCommit,
- Arrays.asList(compactionInstant.get()),
!config.isPreserveHoodieCommitMetadataForCompaction());
+ Arrays.asList(compactionInstant.get()), false);
}
client.close();
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
index 3a84755248e..c6b0560b87e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -61,28 +61,20 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
private static Stream<Arguments> testClustering() {
// enableClusteringAsRow, doUpdates, populateMetaFields,
preserveCommitMetadata
return Stream.of(
- Arguments.of(true, true, true, true),
- Arguments.of(true, true, true, false),
- Arguments.of(true, true, false, true),
- Arguments.of(true, true, false, false),
- Arguments.of(true, false, true, true),
- Arguments.of(true, false, true, false),
- Arguments.of(true, false, false, true),
- Arguments.of(true, false, false, false),
- Arguments.of(false, true, true, true),
- Arguments.of(false, true, true, false),
- Arguments.of(false, true, false, true),
- Arguments.of(false, true, false, false),
- Arguments.of(false, false, true, true),
- Arguments.of(false, false, true, false),
- Arguments.of(false, false, false, true),
- Arguments.of(false, false, false, false)
+ Arguments.of(true, true, true),
+ Arguments.of(true, true, false),
+ Arguments.of(true, false, true),
+ Arguments.of(true, false, false),
+ Arguments.of(false, true, true),
+ Arguments.of(false, true, false),
+ Arguments.of(false, false, true),
+ Arguments.of(false, false, false)
);
}
@ParameterizedTest
@MethodSource
- void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean
populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+ void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean
populateMetaFields) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
@@ -106,7 +98,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
+ .build())
.withRollbackUsingMarkers(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index 77a980f01e5..d442069d50f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -297,7 +297,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
client.compact(compactionInstant.get());
checkBootstrapResults(totalRecords, schema, compactionInstant.get(),
checkNumRawFiles,
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp,
!deltaCommit,
- Arrays.asList(compactionInstant.get()),
!config.isPreserveHoodieCommitMetadataForCompaction());
+ Arrays.asList(compactionInstant.get()), false);
}
client.close();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 879c220faa2..fe2e1702993 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -974,8 +974,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
@ParameterizedTest
- @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false,
SPARK"})
- public void testInlineClustering(String preserveCommitMetadata,
HoodieRecordType recordType) throws Exception {
+ @CsvSource(value = {"AVRO", "SPARK"})
+ public void testInlineClustering(HoodieRecordType recordType) throws
Exception {
String tableBasePath = basePath + "/inlineClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
@@ -985,7 +985,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", "", preserveCommitMetadata));
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
@@ -1122,13 +1122,6 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
- private List<String> getAsyncServicesConfigs(int totalRecords, String
autoClean, String inlineCluster, String inlineClusterMaxCommit,
- String asyncCluster, String
asyncClusterMaxCommit, String preserveCommitMetadata) {
- List<String> configs = getAsyncServicesConfigs(totalRecords, autoClean,
inlineCluster, inlineClusterMaxCommit, asyncCluster, asyncClusterMaxCommit);
- configs.add(String.format("%s=%s",
HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), preserveCommitMetadata));
- return configs;
- }
-
private List<String> getAsyncServicesConfigs(int totalRecords, String
autoClean, String inlineCluster,
String inlineClusterMaxCommit,
String asyncCluster, String asyncClusterMaxCommit) {
List<String> configs = new ArrayList<>();