This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da99f8a5c9 [HUDI-6540] Support failed writes clean policy for Flink 
(#9211)
8da99f8a5c9 is described below

commit 8da99f8a5c9ce3abd5a5a14baf3a8db81c3d39f0
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Aug 2 16:30:53 2023 +0800

    [HUDI-6540] Support failed writes clean policy for Flink (#9211)
---
 .../apache/hudi/configuration/OptionsResolver.java | 14 ++++++++------
 .../sink/clustering/FlinkClusteringConfig.java     | 19 +++++++++++++++++++
 .../hudi/sink/compact/FlinkCompactionConfig.java   | 19 +++++++++++++++++++
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  3 +++
 .../org/apache/hudi/table/HoodieTableSink.java     |  3 +++
 .../org/apache/hudi/util/FlinkWriteClients.java    |  1 -
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 22 ++++++++++++++++++++++
 7 files changed, 74 insertions(+), 7 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index b68dcbd0698..8f4b013de04 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -24,12 +24,13 @@ import 
org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
@@ -281,11 +282,7 @@ public class OptionsResolver {
    * Returns whether the writer txn should be guarded by lock.
    */
   public static boolean isLockRequired(Configuration conf) {
-    return conf.getBoolean(FlinkOptions.METADATA_ENABLED)
-        || ConfigUtils.resolveEnum(WriteConcurrencyMode.class, conf.getString(
-        HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
-        HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()))
-        == WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+    return conf.getBoolean(FlinkOptions.METADATA_ENABLED) || 
isOptimisticConcurrencyControl(conf);
   }
 
   /**
@@ -346,6 +343,11 @@ public class OptionsResolver {
     return conf.getBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false);
   }
 
+  public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
+    return 
conf.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue())
+        .equalsIgnoreCase(HoodieFailedWritesCleaningPolicy.LAZY.name());
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
index 05559c9b732..f533297599e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.clustering;
 
 import 
org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -61,11 +62,26 @@ public class FlinkClusteringConfig extends Configuration {
   @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of 
tasks that do actual clustering, default is -1")
   public Integer clusteringTasks = -1;
 
+  @Parameter(names = {"--clean-policy"},
+      description = "Clean policy to manage the Hudi table. Available option: 
KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+          + "Default is KEEP_LATEST_COMMITS.")
+  public String cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
+
   @Parameter(names = {"--clean-retain-commits"},
       description = "Number of commits to retain. So data will be retained for 
num_of_commits * time_between_commits (scheduled).\n"
           + "This also directly translates into how much you can incrementally 
pull on this table, default 10")
   public Integer cleanRetainCommits = 10;
 
+  @Parameter(names = {"--clean-retain-hours"},
+      description = "Number of hours for which commits need to be retained. 
This config provides a more flexible option as"
+          + "compared to number of commits retained for cleaning service. 
Setting this property ensures all the files, but the latest in a file group,"
+          + " corresponding to commits with commit times older than the 
configured number of hours to be retained are cleaned. default 24")
+  public Integer cleanRetainHours = 24;
+
+  @Parameter(names = {"--clean-retain-file-versions"},
+      description = "Number of file versions to retain. Each file group will 
be retained for this number of version. default 5")
+  public Integer cleanRetainFileVersions = 5;
+
   @Parameter(names = {"--archive-min-commits"},
       description = "Min number of commits to keep before archiving older 
commits into a sequential log, default 20.")
   public Integer archiveMinCommits = 20;
@@ -165,7 +181,10 @@ public class FlinkClusteringConfig extends Configuration {
     conf.setString(FlinkOptions.PATH, config.path);
     conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 
config.archiveMaxCommits);
     conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 
config.archiveMinCommits);
+    conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
     conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 
config.cleanRetainCommits);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_HOURS, config.cleanRetainHours);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, 
config.cleanRetainFileVersions);
     conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 
config.clusteringDeltaCommits);
     conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
     conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, 
config.planStrategyClass);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 0327039e872..5b58dc7ee96 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -76,11 +77,26 @@ public class FlinkCompactionConfig extends Configuration {
   @Parameter(names = {"--clean-async-enabled"}, description = "Whether to 
cleanup the old commits immediately on new commits, enabled by default")
   public Boolean cleanAsyncEnable = false;
 
+  @Parameter(names = {"--clean-policy"},
+      description = "Clean policy to manage the Hudi table. Available option: 
KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
+          + "Default is KEEP_LATEST_COMMITS.")
+  public String cleanPolicy = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
+
   @Parameter(names = {"--clean-retain-commits"},
       description = "Number of commits to retain. So data will be retained for 
num_of_commits * time_between_commits (scheduled).\n"
           + "This also directly translates into how much you can incrementally 
pull on this table, default 10")
   public Integer cleanRetainCommits = 10;
 
+  @Parameter(names = {"--clean-retain-hours"},
+      description = "Number of hours for which commits need to be retained. 
This config provides a more flexible option as"
+          + "compared to number of commits retained for cleaning service. 
Setting this property ensures all the files, but the latest in a file group,"
+          + " corresponding to commits with commit times older than the 
configured number of hours to be retained are cleaned. default 24")
+  public Integer cleanRetainHours = 24;
+
+  @Parameter(names = {"--clean-retain-file-versions"},
+      description = "Number of file versions to retain. Each file group will 
be retained for this number of version. default 5")
+  public Integer cleanRetainFileVersions = 5;
+
   @Parameter(names = {"--archive-min-commits"},
       description = "Min number of commits to keep before archiving older 
commits into a sequential log, default 20.")
   public Integer archiveMinCommits = 20;
@@ -165,7 +181,10 @@ public class FlinkCompactionConfig extends Configuration {
     conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, 
config.compactionTriggerStrategy);
     conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 
config.archiveMaxCommits);
     conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 
config.archiveMinCommits);
+    conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
     conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 
config.cleanRetainCommits);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_HOURS, config.cleanRetainHours);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, 
config.cleanRetainFileVersions);
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 
config.compactionDeltaCommits);
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, 
config.compactionDeltaSeconds);
     conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 
config.compactionMaxMemory);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index c353dd2baa4..b45f9ca3879 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -106,6 +106,9 @@ public class HoodieFlinkStreamer {
       pipeline = Pipelines.append(conf, rowType, dataStream, false);
       if (OptionsResolver.needsAsyncClustering(conf)) {
         Pipelines.cluster(conf, rowType, pipeline);
+      } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+        // add clean function to rollback failed writes for lazy failed writes 
cleaning policy
+        Pipelines.clean(conf, pipeline);
       } else {
         Pipelines.dummySink(pipeline);
       }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 81b3a6eefd5..0096fb3476f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -95,6 +95,9 @@ public class HoodieTableSink implements
         DataStream<Object> pipeline = Pipelines.append(conf, rowType, 
dataStream, context.isBounded());
         if (OptionsResolver.needsAsyncClustering(conf)) {
           return Pipelines.cluster(conf, rowType, pipeline);
+        } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+          // add clean function to rollback failed writes for lazy failed 
writes cleaning policy
+          return Pipelines.clean(conf, pipeline);
         } else {
           return Pipelines.dummySink(pipeline);
         }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 960b85d95ab..3e84e8493e3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -236,7 +236,6 @@ public class FlinkWriteClients {
           .build());
     }
 
-    // do not configure cleaning strategy as LAZY until multi-writers is 
supported.
     HoodieWriteConfig writeConfig = builder.build();
     if (loadFsViewStorageConfig && 
!conf.containsKey(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key())) {
       // do not use the builder to give a change for recovering the original 
fs view storage config
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index d5d7c84ccab..ea8b8b75ce5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -19,11 +19,13 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieWriteConflictException;
@@ -584,6 +586,26 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
     writeClient.close();
   }
 
+  @Test
+  public void testRollbackFailedWritesWithLazyCleanPolicy() throws Exception {
+    conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieFailedWritesCleaningPolicy.LAZY.name());
+
+    preparePipeline()
+        .consume(TestData.DATA_SET_INSERT)
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .subTaskFails(0, 0)
+        .assertEmptyEvent()
+        .rollbackLastCompleteInstantToInflight()
+        .jobFailover()
+        .subTaskFails(0, 1)
+        // the last checkpoint instant was not rolled back by subTaskFails(0, 
1)
+        // with LAZY cleaning strategy because clean action could roll back 
failed writes.
+        .assertNextEvent()
+        .end();
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------

Reply via email to