This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-1.1.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d32a113381729d55b59de925610564cce827ad42
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat Nov 8 11:12:35 2025 +0800

    feat: Support TIMELINE_SERVER_BASED markers for flink writer (#14202)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  7 +++++
 .../hudi/table/marker/WriteMarkersFactory.java     |  2 +-
 .../hudi/table/marker/TestWriteMarkersFactory.java | 18 +++++++++++++
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 28 +++++++++++++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 16 +++++++++++
 .../apache/hudi/utils/TestFlinkWriteClients.java   | 31 ++++++++++++++++++++++
 6 files changed, 101 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8b6f0e99c245..f6bc28b53e4f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -54,6 +54,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -1628,6 +1629,12 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
   }
 
+  public boolean isRemoteViewStorageType() {
+    FileSystemViewStorageType storageType = 
getViewStorageConfig().getStorageType();
+    return storageType == FileSystemViewStorageType.REMOTE_ONLY
+        || storageType == FileSystemViewStorageType.REMOTE_FIRST;
+  }
+
   public int getEmbeddedTimelineServerPort() {
     return 
Integer.parseInt(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_PORT_NUM));
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
index 790534680960..7a4a38fdb0b5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
@@ -47,7 +47,7 @@ public class WriteMarkersFactory {
       case DIRECT:
         return getDirectWriteMarkers(table, instantTime);
       case TIMELINE_SERVER_BASED:
-        if (!table.getConfig().isEmbeddedTimelineServerEnabled()) {
+        if (!table.getConfig().isEmbeddedTimelineServerEnabled() && 
!table.getConfig().isRemoteViewStorageType()) {
           LOG.warn("Timeline-server-based markers are configured as the marker 
type "
               + "but embedded timeline server is not enabled.  Falling back to 
direct markers.");
           return getDirectWriteMarkers(table, instantTime);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
index f9f28ee4b759..3da4f44897e4 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
@@ -114,13 +114,31 @@ public class TestWriteMarkersFactory extends 
HoodieCommonTestHarness {
         HoodieTableVersion.SIX, true, DirectWriteMarkersV1.class);
   }
 
+  @Test
+  public void testTimelineServerBasedMarkersWithRemoteViewStorageType() {
+    // Fallback to direct markers should happen
+    testWriteMarkersFactory(
+        MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH,
+        HoodieTableVersion.current(), false, true, 
TimelineServerBasedWriteMarkers.class);
+    testWriteMarkersFactory(
+        MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH,
+        HoodieTableVersion.SIX, false, true, 
TimelineServerBasedWriteMarkersV1.class);
+  }
+
   private void testWriteMarkersFactory(
       MarkerType markerTypeConfig, String basePath, HoodieTableVersion 
tableVersion,
       boolean isTimelineServerEnabled, Class<?> expectedWriteMarkersClass) {
+    testWriteMarkersFactory(markerTypeConfig, basePath, tableVersion, 
isTimelineServerEnabled, false, expectedWriteMarkersClass);
+  }
+
+  private void testWriteMarkersFactory(
+      MarkerType markerTypeConfig, String basePath, HoodieTableVersion 
tableVersion,
+      boolean isTimelineServerEnabled, boolean isRemoteViewStorageType, 
Class<?> expectedWriteMarkersClass) {
     String instantTime = "001";
     when(table.getConfig()).thenReturn(writeConfig);
     when(writeConfig.isEmbeddedTimelineServerEnabled())
         .thenReturn(isTimelineServerEnabled);
+    
when(writeConfig.isRemoteViewStorageType()).thenReturn(isRemoteViewStorageType);
     when(table.getMetaClient()).thenReturn(metaClient);
     when(metaClient.getStorage()).thenReturn(storage);
     when(storage.getFileSystem()).thenReturn(fileSystem);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index c8e4dd4c6c06..eebcdd8c5c97 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -22,6 +22,8 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -34,7 +36,10 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
 import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.sink.utils.TestWriteBase;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
@@ -45,11 +50,13 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for stream write.
@@ -790,4 +797,25 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .assertNextEvent()
         .end();
   }
+
+  @ParameterizedTest
+  @EnumSource(MarkerType.class)
+  public void testMarkType(MarkerType markerType) throws Exception {
+    conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), 
markerType.toString());
+    TestHarness testHarness =
+        preparePipeline(conf)
+            .consume(TestData.DATA_SET_INSERT)
+            // no checkpoint, so the coordinator does not accept any events
+            .emptyEventBuffer()
+            .checkpoint(1)
+            .assertNextEvent(4, "par1,par2,par3,par4");
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    List<StoragePathInfo> files =  metaClient.getStorage().listFiles(new 
StoragePath(metaClient.getTempFolderPath()));
+    if (markerType == MarkerType.DIRECT) {
+      assertTrue(files.stream().allMatch(f -> 
f.getPath().getName().endsWith("marker.CREATE")));
+    } else {
+      assertTrue(files.stream().noneMatch(f -> 
f.getPath().getName().endsWith("marker.CREATE")));
+    }
+    testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index d8f14601ed85..ebbbebb2d79b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -2715,6 +2716,21 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testWriteWithTimelineServerBasedMarker(HoodieTableType tableType) {
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.TIMELINE_SERVER_BASED.name())
+        .end();
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+    List<Row> rows = 
CollectionUtil.iteratorToList(batchTableEnv.executeSql("select * from 
t1").collect());
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index d823a01f5be1..245300445ca2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -35,9 +35,14 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.DirectWriteMarkers;
+import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -52,6 +57,7 @@ import java.io.File;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -97,6 +103,31 @@ public class TestFlinkWriteClients {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"", "DIRECT", "TIMELINE_SERVER_BASED"})
+  void testMarkerType(String markerType) throws Exception {
+    // create table
+    StreamerUtil.initTableIfNotExists(conf);
+    // This is expected to be used by the driver, the client can then send 
requests for files view.
+    FlinkWriteClients.createWriteClient(conf);
+
+    // do not set mark type to test the default behavior
+    if (!markerType.isEmpty()) {
+      conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.valueOf(markerType).name());
+    }
+    // This is expected to be used by writer client
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, true);
+    try (HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient(HoodieFlinkEngineContext.DEFAULT, writeConfig)) {
+      HoodieTable table = writeClient.getHoodieTable();
+      String markerClass = 
WriteMarkersFactory.get(writeConfig.getMarkersType(), table, 
"001").getClass().getSimpleName();
+      if (markerType.isEmpty() || markerType.equals("DIRECT")) {
+        assertEquals(DirectWriteMarkers.class.getSimpleName(), markerClass);
+      } else {
+        assertEquals(TimelineServerBasedWriteMarkers.class.getSimpleName(), 
markerClass);
+      }
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   void testRecordMergeConfigForEventTimeOrdering(boolean useLegacyConfig) 
throws Exception {

Reply via email to