This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 50eb95c7d6f5 fix(flink): fix disable table service not effective in 
hudi-flink (#13875)
50eb95c7d6f5 is described below

commit 50eb95c7d6f5da986244e53627e54a703f6f8678
Author: fhan <[email protected]>
AuthorDate: Thu May 21 16:32:51 2026 +0800

    fix(flink): fix disable table service not effective in hudi-flink (#13875)
    
    make the HoodieWriteConfig.TABLE_SERVICES_ENABLED effective for Flink.
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
    Co-authored-by: danny0405 <[email protected]>
---
 .../apache/hudi/configuration/FlinkOptions.java    |  6 ++
 .../apache/hudi/configuration/OptionsResolver.java | 36 ++++++++++--
 .../org/apache/hudi/sink/v2/utils/PipelinesV2.java |  8 ++-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  6 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |  6 +-
 .../hudi/configuration/TestOptionsResolver.java    | 68 ++++++++++++++++++++++
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 29 +++++++++
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java | 31 ++++++++++
 8 files changed, 178 insertions(+), 12 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 4910c721e09c..fbe29f93ef5b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -188,6 +188,12 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(false) // keep sync with hoodie style
       .withDescription("If enabled, the checkpoint Id will also be written to 
hudi metadata.");
 
+  public static final ConfigOption<Boolean> TABLE_SERVICES_ENABLED = 
ConfigOptions
+      .key(HoodieWriteConfig.TABLE_SERVICES_ENABLED.key())
+      .booleanType()
+      .defaultValue(HoodieWriteConfig.TABLE_SERVICES_ENABLED.defaultValue())
+      .withDescription("Master control to disable all table services including 
archive, clean, compact, cluster, etc.");
+
   // ------------------------------------------------------------------------
   //  Changelog Capture Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 774a52231e8d..66e6e2815a5a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -296,7 +296,7 @@ public class OptionsResolver {
    * @param conf The flink configuration.
    */
   public static boolean needsAsyncCompaction(Configuration conf) {
-    return OptionsResolver.isMorTable(conf) && 
conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
+    return OptionsResolver.isMorTable(conf) && areTableServicesEnabled(conf) 
&& conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
   }
 
   /**
@@ -305,7 +305,7 @@ public class OptionsResolver {
    * @param conf The flink configuration.
    */
   public static boolean needsAsyncMetadataCompaction(Configuration conf) {
-    return isStreamingIndexWriteEnabled(conf) && 
conf.get(FlinkOptions.METADATA_COMPACTION_ASYNC_ENABLED);
+    return isStreamingIndexWriteEnabled(conf) && areTableServicesEnabled(conf) 
&& conf.get(FlinkOptions.METADATA_COMPACTION_ASYNC_ENABLED);
   }
 
   /**
@@ -314,7 +314,7 @@ public class OptionsResolver {
    * @param conf The flink configuration.
    */
   public static boolean needsScheduleMdtCompaction(Configuration conf) {
-    return isStreamingIndexWriteEnabled(conf) && 
conf.get(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED);
+    return isStreamingIndexWriteEnabled(conf) && areTableServicesEnabled(conf) 
&& conf.get(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED);
   }
 
   /**
@@ -324,7 +324,9 @@ public class OptionsResolver {
    */
   public static boolean needsScheduleCompaction(Configuration conf) {
     return OptionsResolver.isMorTable(conf)
-        && conf.get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED) && 
!isAppendMode(conf);
+        && areTableServicesEnabled(conf)
+        && conf.get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED)
+        && !isAppendMode(conf);
   }
 
   /**
@@ -333,7 +335,7 @@ public class OptionsResolver {
    * @param conf The flink configuration.
    */
   public static boolean needsAsyncClustering(Configuration conf) {
-    return isInsertOperation(conf) && 
conf.get(FlinkOptions.CLUSTERING_ASYNC_ENABLED);
+    return isInsertOperation(conf) && areTableServicesEnabled(conf) && 
conf.get(FlinkOptions.CLUSTERING_ASYNC_ENABLED);
   }
 
   /**
@@ -342,6 +344,9 @@ public class OptionsResolver {
    * @param conf The flink configuration.
    */
   public static boolean needsScheduleClustering(Configuration conf) {
+    if (!areTableServicesEnabled(conf)) {
+      return false;
+    }
     if (!conf.get(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) {
       return false;
     }
@@ -548,6 +553,20 @@ public class OptionsResolver {
     return 
WriteConcurrencyMode.isNonBlockingConcurrencyControl(config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
 HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
   }
 
+  /**
+   * Returns whether the cleaning for failed writes is enabled as lazy.
+   */
+  public static boolean isLazyFailedWritesCleaning(Configuration conf) {
+    return needsAsyncCleaning(conf) && isLazyFailedWritesCleanPolicy(conf);
+  }
+
+  /**
+   * Returns whether there is need for async cleaning (planning & execution).
+   */
+  public static boolean needsAsyncCleaning(Configuration conf) {
+    return areTableServicesEnabled(conf);
+  }
+
   /**
    * Returns whether Cleaner's failed writes policy is set to lazy
    */
@@ -578,6 +597,13 @@ public class OptionsResolver {
     return (isCowTable(conf) || conf.get(FlinkOptions.CDC_ENABLED)) && 
isUpsertOperation(conf);
   }
 
+  /**
+   * Returns whether table services are enabled.
+   */
+  public static boolean areTableServicesEnabled(Configuration conf) {
+    return conf.get(FlinkOptions.TABLE_SERVICES_ENABLED);
+  }
+
   /**
    * Returns the customized insert partitioner instance.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
index 6e93c02bc137..107e8673c42b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
@@ -119,7 +119,7 @@ public class PipelinesV2 {
       DataStream<RowData> pipeline = Pipelines.append(conf, rowType, 
dataStream);
       if (OptionsResolver.needsAsyncClustering(conf)) {
         return clusterV2(conf, rowType, pipeline);
-      } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+      } else if (OptionsResolver.isLazyFailedWritesCleaning(conf)) {
         // add clean function to rollback failed writes for lazy failed writes 
cleaning policy
         return cleanV2(conf, pipeline);
       } else {
@@ -138,8 +138,10 @@ public class PipelinesV2 {
         conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, 
false);
       }
       return compactV2(conf, pipeline);
-    } else {
+    } else if (OptionsResolver.needsAsyncCleaning(conf)) {
       return cleanV2(conf, pipeline);
+    } else {
+      return pipeline;
     }
   }
 
@@ -156,7 +158,7 @@ public class PipelinesV2 {
     if (OptionsResolver.isBulkInsertOperation(conf)) {
       return conf.get(FlinkOptions.WRITE_TASKS);
     } else if (OptionsResolver.isAppendMode(conf)) {
-      return OptionsResolver.needsAsyncClustering(conf) || 
OptionsResolver.isLazyFailedWritesCleanPolicy(conf)
+      return OptionsResolver.needsAsyncClustering(conf) || 
OptionsResolver.isLazyFailedWritesCleaning(conf)
           ? 1 : conf.get(FlinkOptions.WRITE_TASKS);
     } else {
       return 1;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 721b3f94e470..58be515ca901 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -97,7 +97,7 @@ public class HoodieFlinkStreamer {
       pipeline = Pipelines.append(conf, rowType, dataStream);
       if (OptionsResolver.needsAsyncClustering(conf)) {
         Pipelines.cluster(conf, rowType, pipeline);
-      } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+      } else if (OptionsResolver.isLazyFailedWritesCleaning(conf)) {
         // add clean function to rollback failed writes for lazy failed writes 
cleaning policy
         Pipelines.clean(conf, pipeline);
       } else {
@@ -108,8 +108,10 @@ public class HoodieFlinkStreamer {
       pipeline = Pipelines.hoodieStreamWrite(conf, rowType, 
hoodieRecordDataStream);
       if (OptionsResolver.needsAsyncCompaction(conf)) {
         Pipelines.compact(conf, pipeline);
-      } else {
+      } else if (OptionsResolver.needsAsyncCleaning(conf)) {
         Pipelines.clean(conf, pipeline);
+      } else {
+        Pipelines.dummySink(pipeline);
       }
     }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index f46e2e672222..45bfde998af6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -112,7 +112,7 @@ public class HoodieTableSink implements
         DataStream<RowData> pipeline = Pipelines.append(conf, rowType, 
dataStream);
         if (OptionsResolver.needsAsyncClustering(conf)) {
           return Pipelines.cluster(conf, rowType, pipeline);
-        } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+        } else if (OptionsResolver.isLazyFailedWritesCleaning(conf)) {
           // add clean function to rollback failed writes for lazy failed 
writes cleaning policy
           return Pipelines.clean(conf, pipeline);
         } else {
@@ -131,8 +131,10 @@ public class HoodieTableSink implements
           conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, 
false);
         }
         return Pipelines.compact(conf, pipeline);
-      } else {
+      } else if (OptionsResolver.needsAsyncCleaning(conf)) {
         return Pipelines.clean(conf, pipeline);
+      } else {
+        return Pipelines.dummySink(pipeline);
       }
     };
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
index 6c61d5074432..f7266891e1de 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.configuration;
 
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -123,4 +124,71 @@ public class TestOptionsResolver {
     conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
     return conf;
   }
+
+  @Test
+  void testAreTableServicesEnabled() {
+    Configuration conf = new Configuration();
+    // default value should be true
+    assertTrue(OptionsResolver.areTableServicesEnabled(conf));
+
+    // explicitly set to true
+    conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, true);
+    assertTrue(OptionsResolver.areTableServicesEnabled(conf));
+
+    // explicitly set to false
+    conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+    assertFalse(OptionsResolver.areTableServicesEnabled(conf));
+  }
+
+  @Test
+  void testTableServicesGateCompactionAndCleaning() {
+    Configuration conf = getConf();
+    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieFailedWritesCleaningPolicy.LAZY.name());
+
+    assertTrue(OptionsResolver.needsAsyncCompaction(conf));
+    assertTrue(OptionsResolver.needsScheduleCompaction(conf));
+    assertTrue(OptionsResolver.needsAsyncCleaning(conf));
+    assertTrue(OptionsResolver.isLazyFailedWritesCleanPolicy(conf));
+    assertTrue(OptionsResolver.isLazyFailedWritesCleaning(conf));
+
+    conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+
+    assertFalse(OptionsResolver.needsAsyncCompaction(conf));
+    assertFalse(OptionsResolver.needsScheduleCompaction(conf));
+    assertFalse(OptionsResolver.needsAsyncCleaning(conf));
+    assertTrue(OptionsResolver.isLazyFailedWritesCleanPolicy(conf));
+    assertFalse(OptionsResolver.isLazyFailedWritesCleaning(conf));
+  }
+
+  @Test
+  void testTableServicesGateMetadataCompaction() {
+    Configuration conf = getConf();
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name());
+
+    assertTrue(OptionsResolver.needsAsyncMetadataCompaction(conf));
+    assertTrue(OptionsResolver.needsScheduleMdtCompaction(conf));
+
+    conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+
+    assertFalse(OptionsResolver.needsAsyncMetadataCompaction(conf));
+    assertFalse(OptionsResolver.needsScheduleMdtCompaction(conf));
+  }
+
+  @Test
+  void testTableServicesGateClustering() {
+    Configuration conf = getConf();
+    conf.set(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+    conf.set(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+
+    assertTrue(OptionsResolver.needsAsyncClustering(conf));
+    assertTrue(OptionsResolver.needsScheduleClustering(conf));
+
+    conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+
+    assertFalse(OptionsResolver.needsAsyncClustering(conf));
+    assertFalse(OptionsResolver.needsScheduleClustering(conf));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index a4e869428c24..69087122a2e8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -359,6 +359,35 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .end();
   }
 
+  @Test
+  public void testInsertWithTableServiceDisabled() throws Exception {
+    // reset the config option
+    conf.set(FlinkOptions.OPERATION, "insert");
+    conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+    conf.set(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+    conf.set(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
+
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(1)
+        .handleEvents(1)
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED4, 1)
+        // insert duplicates again
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(2)
+        .handleEvents(1)
+        .checkpointComplete(2)
+        .checkWrittenDataCOW(EXPECTED5)
+        .end();
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+    long completedReplaceCommit = 
writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().getInstants().stream().count();
+    long pendingReplaceCommit = 
writeClient.getHoodieTable().getActiveTimeline().filterPendingReplaceTimeline().getInstants().stream().count();
+    assertEquals(0, completedReplaceCommit);
+    assertEquals(0, pendingReplaceCommit);
+  }
+
   @Test
   public void testUpsert() throws Exception {
     // open the function and ingest data
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 7515f8904327..3f24c2763faf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -42,7 +43,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static org.apache.hudi.utils.TestData.insertRow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /**
@@ -56,6 +59,34 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
   }
 
+  @Test
+  public void testUpsertWithTableServiceDisabled() throws Exception {
+    // reset the config option
+    conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_INSERT)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .handleEvents(1)
+        .checkpointComplete(1)
+        .consume(TestData.DATA_SET_INSERT)
+        .checkpoint(2)
+        .handleEvents(1)
+        .checkpointComplete(2)
+        .end();
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+    long completedCompaction = 
writeClient.getHoodieTable().getActiveTimeline().getInstants().stream()
+        .filter(s -> s.getAction().equals(COMPACTION_ACTION))
+        .filter(HoodieInstant::isCompleted).count();
+    long pendingCompaction = 
writeClient.getHoodieTable().getActiveTimeline().filterPendingCompactionTimeline().getInstants().stream().count();
+    assertEquals(0, completedCompaction);
+    assertEquals(0, pendingCompaction);
+  }
+
   @Test
   public void testPartialFailover() {
     // partial failover is only valid for append mode.

Reply via email to