This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-1.1.1-rc2-prep in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d32a113381729d55b59de925610564cce827ad42 Author: Shuo Cheng <[email protected]> AuthorDate: Sat Nov 8 11:12:35 2025 +0800 feat: Support TIMELINE_SERVER_BASED markers for flink writer (#14202) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 7 +++++ .../hudi/table/marker/WriteMarkersFactory.java | 2 +- .../hudi/table/marker/TestWriteMarkersFactory.java | 18 +++++++++++++ .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 28 +++++++++++++++++++ .../apache/hudi/table/ITTestHoodieDataSource.java | 16 +++++++++++ .../apache/hudi/utils/TestFlinkWriteClients.java | 31 ++++++++++++++++++++++ 6 files changed, 101 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 8b6f0e99c245..f6bc28b53e4f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -54,6 +54,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.HoodieRecordUtils; @@ -1628,6 +1629,12 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED); } + public boolean isRemoteViewStorageType() { + FileSystemViewStorageType storageType = getViewStorageConfig().getStorageType(); + return storageType == FileSystemViewStorageType.REMOTE_ONLY + || storageType == FileSystemViewStorageType.REMOTE_FIRST; + } + public int getEmbeddedTimelineServerPort() { return Integer.parseInt(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_PORT_NUM)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java index 790534680960..7a4a38fdb0b5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java @@ -47,7 +47,7 @@ public class WriteMarkersFactory { case DIRECT: return getDirectWriteMarkers(table, instantTime); case TIMELINE_SERVER_BASED: - if (!table.getConfig().isEmbeddedTimelineServerEnabled()) { + if (!table.getConfig().isEmbeddedTimelineServerEnabled() && !table.getConfig().isRemoteViewStorageType()) { LOG.warn("Timeline-server-based markers are configured as the marker type " + "but embedded timeline server is not enabled. Falling back to direct markers."); return getDirectWriteMarkers(table, instantTime); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java index f9f28ee4b759..3da4f44897e4 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java @@ -114,13 +114,31 @@ public class TestWriteMarkersFactory extends HoodieCommonTestHarness { HoodieTableVersion.SIX, true, DirectWriteMarkersV1.class); } + @Test + public void testTimelineServerBasedMarkersWithRemoteViewStorageType() { + // Fallback to direct markers should happen + testWriteMarkersFactory( + MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH, + HoodieTableVersion.current(), false, true, TimelineServerBasedWriteMarkers.class); + testWriteMarkersFactory( + MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH, + HoodieTableVersion.SIX, false, true, TimelineServerBasedWriteMarkersV1.class); + } + private void testWriteMarkersFactory( MarkerType markerTypeConfig, String basePath, HoodieTableVersion tableVersion, boolean isTimelineServerEnabled, Class<?> expectedWriteMarkersClass) { + testWriteMarkersFactory(markerTypeConfig, basePath, tableVersion, isTimelineServerEnabled, false, expectedWriteMarkersClass); + } + + private void testWriteMarkersFactory( + MarkerType markerTypeConfig, String basePath, HoodieTableVersion tableVersion, + boolean isTimelineServerEnabled, boolean isRemoteViewStorageType, Class<?> expectedWriteMarkersClass) { String instantTime = "001"; when(table.getConfig()).thenReturn(writeConfig); when(writeConfig.isEmbeddedTimelineServerEnabled()) .thenReturn(isTimelineServerEnabled); + when(writeConfig.isRemoteViewStorageType()).thenReturn(isRemoteViewStorageType); when(table.getMetaClient()).thenReturn(metaClient); when(metaClient.getStorage()).thenReturn(storage); when(storage.getFileSystem()).thenReturn(fileSystem); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index c8e4dd4c6c06..eebcdd8c5c97 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -22,6 +22,8 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.config.HoodieCleanConfig; @@ -34,7 +36,10 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.FileGroupReaderBasedMergeHandle; import org.apache.hudi.io.HoodieWriteMergeHandle; import org.apache.hudi.sink.utils.TestWriteBase; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; @@ -45,11 +50,13 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for stream write. @@ -790,4 +797,25 @@ public class TestWriteCopyOnWrite extends TestWriteBase { .assertNextEvent() .end(); } + + @ParameterizedTest + @EnumSource(MarkerType.class) + public void testMarkType(MarkerType markerType) throws Exception { + conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), markerType.toString()); + TestHarness testHarness = + preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT) + // no checkpoint, so the coordinator does not accept any events + .emptyEventBuffer() + .checkpoint(1) + .assertNextEvent(4, "par1,par2,par3,par4"); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + List<StoragePathInfo> files = metaClient.getStorage().listFiles(new StoragePath(metaClient.getTempFolderPath())); + if (markerType == MarkerType.DIRECT) { + assertTrue(files.stream().allMatch(f -> f.getPath().getName().endsWith("marker.CREATE"))); + } else { + assertTrue(files.stream().noneMatch(f -> f.getPath().getName().endsWith("marker.CREATE"))); + } + testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index d8f14601ed85..ebbbebb2d79b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -2715,6 +2716,21 @@ public class ITTestHoodieDataSource { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testWriteWithTimelineServerBasedMarker(HoodieTableType tableType) { + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.TIMELINE_SERVER_BASED.name()) + .end(); + batchTableEnv.executeSql(hoodieTableDDL); + + execInsertSql(batchTableEnv, TestSQL.INSERT_T1); + List<Row> rows = CollectionUtil.iteratorToList(batchTableEnv.executeSql("select * from t1").collect()); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java index d823a01f5be1..245300445ca2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java @@ -35,9 +35,14 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.io.FileGroupReaderBasedMergeHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.DirectWriteMarkers; +import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers; +import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; @@ -52,6 +57,7 @@ import java.io.File; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -97,6 +103,31 @@ public class TestFlinkWriteClients { } } + @ParameterizedTest + @ValueSource(strings = {"", "DIRECT", "TIMELINE_SERVER_BASED"}) + void testMarkerType(String markerType) throws Exception { + // create table + StreamerUtil.initTableIfNotExists(conf); + // This is expected to be used by the driver, the client can then send requests for files view. + FlinkWriteClients.createWriteClient(conf); + + // do not set mark type to test the default behavior + if (!markerType.isEmpty()) { + conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.valueOf(markerType).name()); + } + // This is expected to be used by writer client + HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, false, true); + try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(HoodieFlinkEngineContext.DEFAULT, writeConfig)) { + HoodieTable table = writeClient.getHoodieTable(); + String markerClass = WriteMarkersFactory.get(writeConfig.getMarkersType(), table, "001").getClass().getSimpleName(); + if (markerType.isEmpty() || markerType.equals("DIRECT")) { + assertEquals(DirectWriteMarkers.class.getSimpleName(), markerClass); + } else { + assertEquals(TimelineServerBasedWriteMarkers.class.getSimpleName(), markerClass); + } + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testRecordMergeConfigForEventTimeOrdering(boolean useLegacyConfig) throws Exception {
