This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8da99f8a5c9 [HUDI-6540] Support failed writes clean policy for Flink
(#9211)
8da99f8a5c9 is described below
commit 8da99f8a5c9ce3abd5a5a14baf3a8db81c3d39f0
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Aug 2 16:30:53 2023 +0800
[HUDI-6540] Support failed writes clean policy for Flink (#9211)
---
.../apache/hudi/configuration/OptionsResolver.java | 14 ++++++++------
.../sink/clustering/FlinkClusteringConfig.java | 19 +++++++++++++++++++
.../hudi/sink/compact/FlinkCompactionConfig.java | 19 +++++++++++++++++++
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 3 +++
.../org/apache/hudi/table/HoodieTableSink.java | 3 +++
.../org/apache/hudi/util/FlinkWriteClients.java | 1 -
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 22 ++++++++++++++++++++++
7 files changed, 74 insertions(+), 7 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index b68dcbd0698..8f4b013de04 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -24,12 +24,13 @@ import
org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
@@ -281,11 +282,7 @@ public class OptionsResolver {
* Returns whether the writer txn should be guarded by lock.
*/
public static boolean isLockRequired(Configuration conf) {
- return conf.getBoolean(FlinkOptions.METADATA_ENABLED)
- || ConfigUtils.resolveEnum(WriteConcurrencyMode.class, conf.getString(
- HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
- HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()))
- == WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+ return conf.getBoolean(FlinkOptions.METADATA_ENABLED) ||
isOptimisticConcurrencyControl(conf);
}
/**
@@ -346,6 +343,11 @@ public class OptionsResolver {
return conf.getBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false);
}
+ public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
+ return
conf.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue())
+ .equalsIgnoreCase(HoodieFailedWritesCleaningPolicy.LAZY.name());
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
index 05559c9b732..f533297599e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.clustering;
import
org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -61,11 +62,26 @@ public class FlinkClusteringConfig extends Configuration {
@Parameter(names = {"--clustering-tasks"}, description = "Parallelism of
tasks that do actual clustering, default is -1")
public Integer clusteringTasks = -1;
+ @Parameter(names = {"--clean-policy"},
+ description = "Clean policy to manage the Hudi table. Available option:
KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+ + "Default is KEEP_LATEST_COMMITS.")
+ public String cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
+
@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for
num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally
pull on this table, default 10")
public Integer cleanRetainCommits = 10;
+ @Parameter(names = {"--clean-retain-hours"},
+ description = "Number of hours for which commits need to be retained.
This config provides a more flexible option as"
+ + "compared to number of commits retained for cleaning service.
Setting this property ensures all the files, but the latest in a file group,"
+ + " corresponding to commits with commit times older than the
configured number of hours to be retained are cleaned. default 24")
+ public Integer cleanRetainHours = 24;
+
+ @Parameter(names = {"--clean-retain-file-versions"},
+ description = "Number of file versions to retain. Each file group will
be retained for this number of version. default 5")
+ public Integer cleanRetainFileVersions = 5;
+
@Parameter(names = {"--archive-min-commits"},
description = "Min number of commits to keep before archiving older
commits into a sequential log, default 20.")
public Integer archiveMinCommits = 20;
@@ -165,7 +181,10 @@ public class FlinkClusteringConfig extends Configuration {
conf.setString(FlinkOptions.PATH, config.path);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS,
config.archiveMaxCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS,
config.archiveMinCommits);
+ conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS,
config.cleanRetainCommits);
+ conf.setInteger(FlinkOptions.CLEAN_RETAIN_HOURS, config.cleanRetainHours);
+ conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS,
config.cleanRetainFileVersions);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS,
config.clusteringDeltaCommits);
conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
config.planStrategyClass);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 0327039e872..5b58dc7ee96 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sink.compact;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -76,11 +77,26 @@ public class FlinkCompactionConfig extends Configuration {
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to
cleanup the old commits immediately on new commits, enabled by default")
public Boolean cleanAsyncEnable = false;
+ @Parameter(names = {"--clean-policy"},
+ description = "Clean policy to manage the Hudi table. Available option:
KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+ + "Default is KEEP_LATEST_COMMITS.")
+ public String cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
+
@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for
num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally
pull on this table, default 10")
public Integer cleanRetainCommits = 10;
+ @Parameter(names = {"--clean-retain-hours"},
+ description = "Number of hours for which commits need to be retained.
This config provides a more flexible option as"
+ + "compared to number of commits retained for cleaning service.
Setting this property ensures all the files, but the latest in a file group,"
+ + " corresponding to commits with commit times older than the
configured number of hours to be retained are cleaned. default 24")
+ public Integer cleanRetainHours = 24;
+
+ @Parameter(names = {"--clean-retain-file-versions"},
+ description = "Number of file versions to retain. Each file group will
be retained for this number of version. default 5")
+ public Integer cleanRetainFileVersions = 5;
+
@Parameter(names = {"--archive-min-commits"},
description = "Min number of commits to keep before archiving older
commits into a sequential log, default 20.")
public Integer archiveMinCommits = 20;
@@ -165,7 +181,10 @@ public class FlinkCompactionConfig extends Configuration {
conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY,
config.compactionTriggerStrategy);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS,
config.archiveMaxCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS,
config.archiveMinCommits);
+ conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS,
config.cleanRetainCommits);
+ conf.setInteger(FlinkOptions.CLEAN_RETAIN_HOURS, config.cleanRetainHours);
+ conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS,
config.cleanRetainFileVersions);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS,
config.compactionDeltaCommits);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS,
config.compactionDeltaSeconds);
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY,
config.compactionMaxMemory);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index c353dd2baa4..b45f9ca3879 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -106,6 +106,9 @@ public class HoodieFlinkStreamer {
pipeline = Pipelines.append(conf, rowType, dataStream, false);
if (OptionsResolver.needsAsyncClustering(conf)) {
Pipelines.cluster(conf, rowType, pipeline);
+ } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+ // add clean function to rollback failed writes for lazy failed writes
cleaning policy
+ Pipelines.clean(conf, pipeline);
} else {
Pipelines.dummySink(pipeline);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 81b3a6eefd5..0096fb3476f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -95,6 +95,9 @@ public class HoodieTableSink implements
DataStream<Object> pipeline = Pipelines.append(conf, rowType,
dataStream, context.isBounded());
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
+ } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+ // add clean function to rollback failed writes for lazy failed
writes cleaning policy
+ return Pipelines.clean(conf, pipeline);
} else {
return Pipelines.dummySink(pipeline);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 960b85d95ab..3e84e8493e3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -236,7 +236,6 @@ public class FlinkWriteClients {
.build());
}
- // do not configure cleaning strategy as LAZY until multi-writers is
supported.
HoodieWriteConfig writeConfig = builder.build();
if (loadFsViewStorageConfig &&
!conf.containsKey(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key())) {
// do not use the builder to give a change for recovering the original
fs view storage config
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index d5d7c84ccab..ea8b8b75ce5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -19,11 +19,13 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieWriteConflictException;
@@ -584,6 +586,26 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
writeClient.close();
}
+ @Test
+ public void testRollbackFailedWritesWithLazyCleanPolicy() throws Exception {
+ conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
+
+ preparePipeline()
+ .consume(TestData.DATA_SET_INSERT)
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .subTaskFails(0, 0)
+ .assertEmptyEvent()
+ .rollbackLastCompleteInstantToInflight()
+ .jobFailover()
+ .subTaskFails(0, 1)
+ // the last checkpoint instant was not rolled back by subTaskFails(0,
1)
+ // with LAZY cleaning strategy because clean action could roll back
failed writes.
+ .assertNextEvent()
+ .end();
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------