This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ce58d02aa36 [HUDI-6336] Support flink timeline-based ckp metadata 
(#9651)
ce58d02aa36 is described below

commit ce58d02aa367cc0a030d70e2225bb3cc191d2820
Author: StreamingFlames <[email protected]>
AuthorDate: Thu Sep 28 20:32:32 2023 -0500

    [HUDI-6336] Support flink timeline-based ckp metadata (#9651)
---
 .../client/embedded/EmbeddedTimelineService.java   |   6 +
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  25 ++++
 .../marker/TimelineServerBasedWriteMarkers.java    |  56 ++-------
 .../org/apache/hudi/util/HttpRequestClient.java    | 102 ++++++++++++++++
 .../common/table/timeline/dto/InstantStateDTO.java |  68 +++++++++++
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   8 +-
 .../hudi/sink/bootstrap/BootstrapOperator.java     |   3 +-
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |   3 +-
 .../sink/common/AbstractStreamWriteFunction.java   |   3 +-
 .../org/apache/hudi/sink/meta/CkpMetadata.java     |  33 ++----
 .../apache/hudi/sink/meta/CkpMetadataFactory.java  |  43 +++++++
 .../hudi/sink/meta/TimelineBasedCkpMetadata.java   | 112 ++++++++++++++++++
 .../TestWriteWithTimelineBasedCkpMetadata.java     |  63 ++++++++++
 .../org/apache/hudi/sink/meta/TestCkpMetadata.java |  34 ++++--
 .../sink/meta/TestTimelineBasedCkpMetadata.java    | 117 ++++++++++++++++++
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |  14 ++-
 .../hudi/timeline/service/RequestHandler.java      |  31 ++++-
 .../hudi/timeline/service/TimelineService.java     |  22 ++++
 .../service/handlers/InstantStateHandler.java      | 131 +++++++++++++++++++++
 19 files changed, 789 insertions(+), 85 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 7d794366ba0..feda7d2b185 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -100,6 +100,12 @@ public class EmbeddedTimelineService {
                   * writeConfig.getHoodieClientHeartbeatTolerableMisses());
     }
 
+    if (writeConfig.isTimelineServerBasedInstantStateEnabled()) {
+      timelineServiceConfBuilder
+          
.instantStateForceRefreshRequestNumber(writeConfig.getTimelineServerBasedInstantStateForceRefreshRequestNumber())
+          .enableInstantStateRequests(true);
+    }
+
     server = new TimelineService(context, hadoopConf.newCopy(), 
timelineServiceConfBuilder.build(),
         FSUtils.getFs(basePath, hadoopConf.newCopy()), viewManager);
     serverPort = server.startService();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 367f9388726..87734c042d3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -428,6 +428,18 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("0.9.0")
       .withDocumentation(MarkerType.class);
 
+  public static final ConfigProperty<Boolean> 
INSTANT_STATE_TIMELINE_SERVER_BASED = ConfigProperty
+      .key("hoodie.instant_state.timeline_server_based.enabled")
+      .defaultValue(false)
+      .sinceVersion("1.0.0")
+      .withDocumentation("If enabled, writers get instant state from timeline 
server rather than requesting DFS directly");
+
+  public static final ConfigProperty<Integer> 
INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER = 
ConfigProperty
+      
.key("hoodie.instant_state.timeline_server_based.force_refresh.request.number")
+      .defaultValue(100)
+      .sinceVersion("1.0.0")
+      .withDocumentation("Number of requests to trigger instant state cache 
refreshing");
+
   public static final ConfigProperty<Integer> 
MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty
       .key("hoodie.markers.timeline_server_based.batch.num_threads")
       .defaultValue(20)
@@ -1355,6 +1367,14 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(FINALIZE_WRITE_PARALLELISM_VALUE);
   }
 
+  public boolean isTimelineServerBasedInstantStateEnabled() {
+    return getBoolean(INSTANT_STATE_TIMELINE_SERVER_BASED);
+  }
+
+  public int getTimelineServerBasedInstantStateForceRefreshRequestNumber() {
+    return 
getInt(INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER);
+  }
+
   public MarkerType getMarkersType() {
     String markerType = getString(MARKERS_TYPE);
     return MarkerType.valueOf(markerType.toUpperCase());
@@ -2930,6 +2950,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withTimelineServerBasedInstantStateEnable(boolean enable) {
+      writeConfig.setValue(INSTANT_STATE_TIMELINE_SERVER_BASED, 
String.valueOf(enable));
+      return this;
+    }
+
     public Builder withBulkInsertSortMode(String mode) {
       writeConfig.setValue(BULK_INSERT_SORT_MODE, mode);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
index 9d6b7f9b9a9..a93d4fc91fb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
@@ -29,13 +29,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
 import org.apache.hudi.exception.HoodieRemoteException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.HttpRequestClient;
+import org.apache.hudi.util.HttpRequestClient.RequestMethod;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.fs.Path;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,10 +62,8 @@ import static 
org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PA
  */
 public class TimelineServerBasedWriteMarkers extends WriteMarkers {
   private static final Logger LOG = 
LoggerFactory.getLogger(TimelineServerBasedWriteMarkers.class);
-  private final ObjectMapper mapper;
-  private final String timelineServerHost;
-  private final int timelineServerPort;
-  private final int timeoutSecs;
+
+  private final HttpRequestClient httpRequestClient;
 
   public TimelineServerBasedWriteMarkers(HoodieTable table, String 
instantTime) {
     this(table.getMetaClient().getBasePath(),
@@ -80,17 +76,14 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
   TimelineServerBasedWriteMarkers(String basePath, String markerFolderPath, 
String instantTime,
                                   String timelineServerHost, int 
timelineServerPort, int timeoutSecs) {
     super(basePath, markerFolderPath, instantTime);
-    this.mapper = new ObjectMapper();
-    this.timelineServerHost = timelineServerHost;
-    this.timelineServerPort = timelineServerPort;
-    this.timeoutSecs = timeoutSecs;
+    this.httpRequestClient = new HttpRequestClient(timelineServerHost, 
timelineServerPort, timeoutSecs, 0);
   }
 
   @Override
   public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) 
{
     Map<String, String> paramsMap = 
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
     try {
-      return executeRequestToTimelineServer(
+      return httpRequestClient.executeRequest(
           DELETE_MARKER_DIR_URL, paramsMap, new TypeReference<Boolean>() {}, 
RequestMethod.POST);
     } catch (IOException e) {
       throw new HoodieRemoteException("Failed to delete marker directory " + 
markerDirPath.toString(), e);
@@ -101,7 +94,7 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
   public boolean doesMarkerDirExist() {
     Map<String, String> paramsMap = 
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
     try {
-      return executeRequestToTimelineServer(
+      return httpRequestClient.executeRequest(
           MARKERS_DIR_EXISTS_URL, paramsMap, new TypeReference<Boolean>() {}, 
RequestMethod.GET);
     } catch (IOException e) {
       throw new HoodieRemoteException("Failed to check marker directory " + 
markerDirPath.toString(), e);
@@ -112,7 +105,7 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
   public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, 
int parallelism) throws IOException {
     Map<String, String> paramsMap = 
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
     try {
-      Set<String> markerPaths = executeRequestToTimelineServer(
+      Set<String> markerPaths = httpRequestClient.executeRequest(
           CREATE_AND_MERGE_MARKERS_URL, paramsMap, new 
TypeReference<Set<String>>() {}, RequestMethod.GET);
       return 
markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet());
     } catch (IOException e) {
@@ -125,7 +118,7 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
   public Set<String> allMarkerFilePaths() {
     Map<String, String> paramsMap = 
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
     try {
-      return executeRequestToTimelineServer(
+      return httpRequestClient.executeRequest(
           ALL_MARKERS_URL, paramsMap, new TypeReference<Set<String>>() {}, 
RequestMethod.GET);
     } catch (IOException e) {
       throw new HoodieRemoteException("Failed to get all markers in " + 
markerDirPath.toString(), e);
@@ -179,9 +172,9 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
   private boolean executeCreateMarkerRequest(Map<String, String> paramsMap, 
String partitionPath, String markerFileName) {
     boolean success;
     try {
-      success = executeRequestToTimelineServer(
+      success = httpRequestClient.executeRequest(
           CREATE_MARKER_URL, paramsMap, new TypeReference<Boolean>() {
-          }, RequestMethod.POST);
+          }, HttpRequestClient.RequestMethod.POST);
     } catch (IOException e) {
       throw new HoodieRemoteException("Failed to create marker file " + 
partitionPath + "/" + markerFileName, e);
     }
@@ -212,31 +205,4 @@ public class TimelineServerBasedWriteMarkers extends 
WriteMarkers {
     return paramsMap;
   }
 
-  private <T> T executeRequestToTimelineServer(String requestPath, Map<String, 
String> queryParameters,
-                                               TypeReference reference, 
RequestMethod method) throws IOException {
-    URIBuilder builder =
-        new 
URIBuilder().setHost(timelineServerHost).setPort(timelineServerPort).setPath(requestPath).setScheme("http");
-
-    queryParameters.forEach(builder::addParameter);
-
-    String url = builder.toString();
-    LOG.debug("Sending request : (" + url + ")");
-    Response response;
-    int timeout = this.timeoutSecs * 1000; // msec
-    switch (method) {
-      case GET:
-        response = 
Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
-        break;
-      case POST:
-      default:
-        response = 
Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
-        break;
-    }
-    String content = response.returnContent().asString();
-    return (T) mapper.readValue(content, reference);
-  }
-
-  private enum RequestMethod {
-    GET, POST
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java
new file mode 100644
index 00000000000..65131cc7742
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Helper class for executing timeline server requests.
+ */
+public class HttpRequestClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HttpRequestClient.class);
+  private final ObjectMapper mapper;
+  private final String serverHost;
+  private final int serverPort;
+  private final int timeoutSecs;
+  private final int maxRetry;
+
+  public HttpRequestClient(HoodieWriteConfig writeConfig) {
+    this(writeConfig.getViewStorageConfig().getRemoteViewServerHost(),
+        writeConfig.getViewStorageConfig().getRemoteViewServerPort(),
+        
writeConfig.getViewStorageConfig().getRemoteTimelineClientTimeoutSecs(),
+        
writeConfig.getViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers());
+  }
+
+  public HttpRequestClient(String serverHost, int serverPort, int timeoutSecs, 
int maxRetry) {
+    this.mapper = new ObjectMapper();
+    this.serverHost = serverHost;
+    this.serverPort = serverPort;
+    this.timeoutSecs = timeoutSecs;
+    this.maxRetry = maxRetry;
+  }
+
+  public <T> T executeRequestWithRetry(String requestPath, Map<String, String> 
queryParameters,
+                                       TypeReference reference, RequestMethod 
method) {
+    int retry = maxRetry;
+    while (--retry >= 0) {
+      try {
+        return executeRequest(requestPath, queryParameters, reference, method);
+      } catch (IOException e) {
+        LOG.warn("Failed to execute request (" + requestPath + ") to timeline 
server", e);
+      }
+    }
+    throw new HoodieException("Failed to execute timeline server request (" + 
requestPath + ")");
+  }
+
+  public <T> T executeRequest(String requestPath, Map<String, String> 
queryParameters,
+                              TypeReference reference, RequestMethod method) 
throws IOException {
+    URIBuilder builder =
+        new 
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme("http");
+
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.debug("Sending request : (" + url + ")");
+    Response response;
+    int timeout = this.timeoutSecs * 1000; // msec
+    switch (method) {
+      case GET:
+        response = 
Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        break;
+      case POST:
+      default:
+        response = 
Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        break;
+    }
+    String content = response.returnContent().asString();
+    return (T) mapper.readValue(content, reference);
+  }
+
+  public enum RequestMethod {
+    GET, POST
+  }
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java
new file mode 100644
index 00000000000..832f4aa4b8e
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.dto;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Data transfer object for instant state.
+ *
+ * see org.apache.hudi.sink.meta.CkpMessage.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class InstantStateDTO {
+
+  /**
+   * The instant time.
+   *
+   * @see org.apache.hudi.sink.meta.CkpMessage#instant
+   */
+  @JsonProperty("instant")
+  String instant;
+
+  /**
+   * The instant state.
+   *
+   * @see org.apache.hudi.sink.meta.CkpMessage#state
+   */
+  @JsonProperty("state")
+  String state;
+
+  public static InstantStateDTO fromFileStatus(FileStatus fileStatus) {
+    InstantStateDTO ret = new InstantStateDTO();
+    String fileName = fileStatus.getPath().getName();
+    String[] nameAndExt = fileName.split("\\.");
+    ValidationUtils.checkState(nameAndExt.length == 2);
+    ret.instant = nameAndExt[0];
+    ret.state = nameAndExt[1];
+    return ret;
+  }
+
+  public String getInstant() {
+    return instant;
+  }
+
+  public String getState() {
+    return state;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 34d8322dd9d..c580d43c351 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -37,6 +38,7 @@ import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.ClientIds;
@@ -186,9 +188,9 @@ public class StreamWriteOperatorCoordinator
     this.gateways = new SubtaskGateway[this.parallelism];
     // init table, create if not exists.
     this.metaClient = initTableIfNotExists(this.conf);
-    this.ckpMetadata = initCkpMetadata(this.metaClient, this.conf);
     // the write client must create after the table creation
     this.writeClient = FlinkWriteClients.createWriteClient(conf);
+    this.ckpMetadata = initCkpMetadata(writeClient.getConfig(), this.conf);
     initMetadataTable(this.writeClient);
     this.tableState = TableState.create(conf);
     // start the executor
@@ -349,8 +351,8 @@ public class StreamWriteOperatorCoordinator
     writeClient.initMetadataTable();
   }
 
-  private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient, 
Configuration conf) throws IOException {
-    CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient, 
conf.getString(FlinkOptions.WRITE_CLIENT_ID));
+  private static CkpMetadata initCkpMetadata(HoodieWriteConfig writeConfig, 
Configuration conf) throws IOException {
+    CkpMetadata ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, 
conf);
     ckpMetadata.bootstrap();
     return ckpMetadata;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 1bdfeb7296b..1c6c7fca650 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -40,6 +40,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
 import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.util.FlinkTables;
@@ -131,7 +132,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
     this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
     this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, 
true);
     this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, 
getRuntimeContext());
-    this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient(), 
this.conf.getString(FlinkOptions.WRITE_CLIENT_ID));
+    this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, conf);
     this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
 
     preLoadIndexRecords();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index ec4e56c2d1d..d44ef25ee4b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
 import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.util.FlinkWriteClients;
 
@@ -113,7 +114,7 @@ public class BulkInsertWriteFunction<I>
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
     this.writeClient = FlinkWriteClients.createWriteClient(this.config, 
getRuntimeContext());
-    this.ckpMetadata = CkpMetadata.getInstance(config);
+    this.ckpMetadata = 
CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), config);
     this.initInstant = lastPendingInstant();
     sendBootstrapEvent();
     initWriterHelper();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 3bd19fa0699..e0bb71b90c7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -28,6 +28,7 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
 import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
@@ -148,7 +149,7 @@ public abstract class AbstractStreamWriteFunction<I>
             TypeInformation.of(WriteMetadataEvent.class)
         ));
 
-    this.ckpMetadata = CkpMetadata.getInstance(this.metaClient, 
this.config.getString(FlinkOptions.WRITE_CLIENT_ID));
+    this.ckpMetadata = 
CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), config);
     this.currentInstant = lastPendingInstant();
     if (context.isRestored()) {
       restoreWriteMetadata();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 9b0457845e9..90636bf6ac5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -18,16 +18,12 @@
 
 package org.apache.hudi.sink.meta;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -41,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The checkpoint metadata for bookkeeping the checkpoint messages.
@@ -76,12 +73,7 @@ public class CkpMetadata implements Serializable, 
AutoCloseable {
   private List<CkpMessage> messages;
   private List<String> instantCache;
 
-  private CkpMetadata(Configuration config) {
-    this(FSUtils.getFs(config.getString(FlinkOptions.PATH), 
HadoopConfigurations.getHadoopConf(config)),
-        config.getString(FlinkOptions.PATH), 
config.getString(FlinkOptions.WRITE_CLIENT_ID));
-  }
-
-  private CkpMetadata(FileSystem fs, String basePath, String uniqueId) {
+  CkpMetadata(FileSystem fs, String basePath, String uniqueId) {
     this.fs = fs;
     this.path = new Path(ckpMetaPath(basePath, uniqueId));
   }
@@ -211,17 +203,6 @@ public class CkpMetadata implements Serializable, 
AutoCloseable {
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
-  public static CkpMetadata getInstance(Configuration config) {
-    return new CkpMetadata(config);
-  }
-
-  public static CkpMetadata getInstance(HoodieTableMetaClient metaClient, 
String uniqueId) {
-    return new CkpMetadata(metaClient.getFs(), metaClient.getBasePath(), 
uniqueId);
-  }
-
-  public static CkpMetadata getInstance(FileSystem fs, String basePath, String 
uniqueId) {
-    return new CkpMetadata(fs, basePath, uniqueId);
-  }
 
   protected static String ckpMetaPath(String basePath, String uniqueId) {
     // .hoodie/.aux/ckp_meta
@@ -233,12 +214,16 @@ public class CkpMetadata implements Serializable, 
AutoCloseable {
     return new Path(path, fileName);
   }
 
-  private List<CkpMessage> scanCkpMetadata(Path ckpMetaPath) throws 
IOException {
+  protected Stream<CkpMessage> fetchCkpMessages(Path ckpMetaPath) throws 
IOException {
     // This is required when the storage is minio
     if (!this.fs.exists(ckpMetaPath)) {
-      return new ArrayList<>();
+      return Stream.empty();
     }
-    return Arrays.stream(this.fs.listStatus(ckpMetaPath)).map(CkpMessage::new)
+    return Arrays.stream(this.fs.listStatus(ckpMetaPath)).map(CkpMessage::new);
+  }
+
+  protected List<CkpMessage> scanCkpMetadata(Path ckpMetaPath) throws 
IOException {
+    return fetchCkpMessages(ckpMetaPath)
         
.collect(Collectors.groupingBy(CkpMessage::getInstant)).values().stream()
         .map(messages -> messages.stream().reduce((x, y) -> {
           // Pick the one with the highest state
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadataFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadataFactory.java
new file mode 100644
index 00000000000..852f8a1b260
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadataFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.meta;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * A factory to generate {@link CkpMetadata} instance based on whether {@link 
HoodieWriteConfig#INSTANT_STATE_TIMELINE_SERVER_BASED} enabled.
+ */
+public class CkpMetadataFactory {
+  public static CkpMetadata getCkpMetadata(HoodieWriteConfig writeConfig, 
Configuration conf) {
+    FileSystem fs = FSUtils.getFs(conf.getString(FlinkOptions.PATH), 
HadoopConfigurations.getHadoopConf(conf));
+    String basePath = conf.getString(FlinkOptions.PATH);
+    String uniqueId = conf.getString(FlinkOptions.WRITE_CLIENT_ID);
+    if (writeConfig.isEmbeddedTimelineServerEnabled() && 
writeConfig.isTimelineServerBasedInstantStateEnabled()) {
+      return new TimelineBasedCkpMetadata(fs, basePath, uniqueId, writeConfig);
+    } else {
+      return new CkpMetadata(fs, basePath, uniqueId);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/TimelineBasedCkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/TimelineBasedCkpMetadata.java
new file mode 100644
index 00000000000..44229f36d44
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/TimelineBasedCkpMetadata.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.meta;
+
+import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
+import org.apache.hudi.util.HttpRequestClient;
+import org.apache.hudi.util.HttpRequestClient.RequestMethod;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+/**
+ * Timeline server based CkpMetadata, will read ckpMessages from 
timeline-server instead of from file system directly.
+ */
+public class TimelineBasedCkpMetadata extends CkpMetadata {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TimelineBasedCkpMetadata.class);
+
+  private final HttpRequestClient httpRequestClient;
+
+  public TimelineBasedCkpMetadata(FileSystem fs, String basePath, String 
uniqueId, HoodieWriteConfig writeConfig) {
+    super(fs, basePath, uniqueId);
+    this.httpRequestClient = new HttpRequestClient(writeConfig);
+    LOG.info("Timeline server based CkpMetadata enabled");
+  }
+
+  @Override
+  public void startInstant(String instant) {
+    super.startInstant(instant);
+    sendRefreshRequest();
+  }
+
+  @Override
+  public void commitInstant(String instant) {
+    super.commitInstant(instant);
+    sendRefreshRequest();
+  }
+
+  @Override
+  public void abortInstant(String instant) {
+    super.abortInstant(instant);
+    sendRefreshRequest();
+  }
+
+  @Override
+  protected Stream<CkpMessage> fetchCkpMessages(Path ckpMetaPath) throws 
IOException {
+    // Read ckp messages from timeline server
+    Stream<CkpMessage> ckpMessageStream;
+    try {
+      List<InstantStateDTO> instantStateDTOList = 
httpRequestClient.executeRequestWithRetry(
+          InstantStateHandler.ALL_INSTANT_STATE_URL, 
getRequestParams(ckpMetaPath.toString()),
+          new TypeReference<List<InstantStateDTO>>() {
+          }, RequestMethod.GET);
+      ckpMessageStream = instantStateDTOList.stream().map(c -> new 
CkpMessage(c.getInstant(), c.getState()));
+    } catch (Exception e) {
+      LOG.error("Failed to execute scan ckp metadata, fall back to read from 
file system...", e);
+      // If we failed to request timeline server, read ckp messages from file 
system directly.
+      ckpMessageStream = super.fetchCkpMessages(ckpMetaPath);
+    }
+    return ckpMessageStream;
+  }
+
+  private Map<String, String> getRequestParams(String dirPath) {
+    return 
Collections.singletonMap(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM, 
dirPath);
+  }
+
+  /**
+   * Refresh the ckp messages that cached in timeline server.
+   */
+  private void sendRefreshRequest() {
+    try {
+      boolean success = httpRequestClient.executeRequestWithRetry(
+          InstantStateHandler.REFRESH_INSTANT_STATE, 
getRequestParams(path.toString()),
+          new TypeReference<Boolean>() {
+          }, RequestMethod.POST);
+      if (!success) {
+        LOG.warn("Timeline server responses with failed refresh");
+      }
+    } catch (Exception e) {
+      // Do not propagate the exception because the server will also do auto 
refresh
+      LOG.error("Failed to execute refresh", e);
+    }
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteWithTimelineBasedCkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteWithTimelineBasedCkpMetadata.java
new file mode 100644
index 00000000000..12c67cfc7b8
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteWithTimelineBasedCkpMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test cases for timeline based checkpoint metadata.
+ */
+public class TestWriteWithTimelineBasedCkpMetadata extends 
TestWriteCopyOnWrite {
+
+  @Override
+  protected void setUp(Configuration conf) {
+    
conf.setBoolean(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(), 
true);
+  }
+
+  @Test
+  public void testTimelineBasedCkpMetadataFailover() throws Exception {
+    // reset the config option
+    conf.setString(FlinkOptions.OPERATION, "insert");
+    conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+    conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+    conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
+
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(1)
+        // stop the timeline server by close write client
+        .stopTimelineServer()
+        .handleEvents(1)
+        .checkpointComplete(1)
+        // will still be able to write and checkpoint
+        .checkWrittenData(EXPECTED4, 1)
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(2)
+        .handleEvents(1)
+        .checkpointComplete(2)
+        .checkWrittenDataCOW(EXPECTED5)
+        .end();
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index 1ef2254ff8e..a36810d1301 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -18,13 +18,14 @@
 
 package org.apache.hudi.sink.meta;
 
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -32,6 +33,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.stream.IntStream;
@@ -49,11 +51,20 @@ public class TestCkpMetadata {
   @TempDir
   File tempFile;
 
+  protected Configuration conf;
+
+  protected HoodieFlinkWriteClient writeClient;
+
   @BeforeEach
   public void beforeEach() throws Exception {
+    setup();
+  }
+
+  protected void setup() throws IOException {
     String basePath = tempFile.getAbsolutePath();
-    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    this.conf = TestConfigurations.getDefaultConf(basePath);
     StreamerUtil.initTableIfNotExists(conf);
+    this.writeClient = FlinkWriteClients.createWriteClient(conf);
   }
 
   @ParameterizedTest
@@ -94,9 +105,16 @@ public class TestCkpMetadata {
         metadata1.getInstantCache(), is(Collections.singletonList("4")));
   }
 
-  private CkpMetadata getCkpMetadata(String uniqueId) {
-    String basePath = tempFile.getAbsolutePath();
-    FileSystem fs = FSUtils.getFs(basePath, 
HadoopConfigurations.getHadoopConf(new Configuration()));
-    return CkpMetadata.getInstance(fs, basePath, uniqueId);
+  protected CkpMetadata getCkpMetadata(String uniqueId) {
+    conf.set(FlinkOptions.WRITE_CLIENT_ID, uniqueId);
+    return CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), conf);
+  }
+
+  @AfterEach
+  public void cleanup() {
+    if (writeClient != null) {
+      writeClient.close();
+      writeClient = null;
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestTimelineBasedCkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestTimelineBasedCkpMetadata.java
new file mode 100644
index 00000000000..3083e863720
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestTimelineBasedCkpMetadata.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.meta;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link CkpMetadata} when timeline based enabled.
+ */
+public class TestTimelineBasedCkpMetadata extends TestCkpMetadata {
+
+  @Override
+  public void setup() throws IOException {
+    String basePath = tempFile.getAbsolutePath();
+    this.conf = TestConfigurations.getDefaultConf(basePath);
+    
conf.setString(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(), 
"true");
+    StreamerUtil.initTableIfNotExists(conf);
+    this.writeClient = FlinkWriteClients.createWriteClient(conf);
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"", "1"})
+  public void testFailOver(String uniqueId) {
+    CkpMetadata metadata = getCkpMetadata(uniqueId);
+    // write and read 5 committed checkpoints
+    IntStream.range(0, 3).forEach(i -> metadata.startInstant(i + ""));
+
+    assertThat(metadata.lastPendingInstant(), is("2"));
+    metadata.commitInstant("2");
+    assertThat(metadata.lastPendingInstant(), equalTo(null));
+
+    // Close write client and timeline server
+    cleanup();
+
+    // When timeline server is not responding, we can still read ckp metadata 
from file system directly
+    // test cleaning
+    IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));
+    assertThat(metadata.getMessages().size(), is(3));
+    // commit and abort instant does not trigger cleaning
+    metadata.commitInstant("6");
+    metadata.abortInstant("7");
+    assertThat(metadata.getMessages().size(), is(5));
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"", "1"})
+  public void testRefreshEveryNCommits(String uniqueId) {
+    // File system based writing
+    
writeClient.getConfig().setValue(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(),
 "false");
+    CkpMetadata writeMetadata = getCkpMetadata(uniqueId);
+
+    // Timeline-based reading
+    
writeClient.getConfig().setValue(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(),
 "true");
+    CkpMetadata readOnlyMetadata = getCkpMetadata(uniqueId);
+
+    // write and read 5 committed checkpoints
+    IntStream.range(0, 3).forEach(i -> writeMetadata.startInstant(i + ""));
+    assertThat(readOnlyMetadata.lastPendingInstant(), is("2"));
+    writeMetadata.commitInstant("2");
+    // Send 10 requests to server
+    readCkpMessagesNTimes(readOnlyMetadata, 10);
+    assertThat(readOnlyMetadata.lastPendingInstant(), equalTo("2"));
+    // Send 100 requests to server, will trigger refresh
+    readCkpMessagesNTimes(readOnlyMetadata, 100);
+    assertThat(readOnlyMetadata.lastPendingInstant(), equalTo(null));
+
+    IntStream.range(3, 6).forEach(i -> writeMetadata.startInstant(i + ""));
+    readCkpMessagesNTimes(readOnlyMetadata, 200);
+    assertThat(readOnlyMetadata.getMessages().size(), is(3));
+
+    writeMetadata.commitInstant("6");
+    writeMetadata.abortInstant("7");
+    readCkpMessagesNTimes(readOnlyMetadata, 200);
+    assertThat(readOnlyMetadata.getMessages().size(), is(5));
+  }
+
+  /**
+   * Send read requests to server to trigger checkpoint metadata refreshing.
+   */
+  private void readCkpMessagesNTimes(CkpMetadata metadata, int maxRetry) {
+    int retry = 0;
+    while (retry < maxRetry) {
+      metadata.getMessages();
+      retry++;
+    }
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b2d6546e1c1..7332a0ae61a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.sink.utils;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -27,10 +28,12 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
@@ -144,7 +147,8 @@ public class TestWriteBase {
       this.pipeline = TestData.getWritePipeline(this.basePath, conf);
       // open the function and ingest data
       this.pipeline.openFunction();
-      this.ckpMetadata = CkpMetadata.getInstance(conf);
+      HoodieWriteConfig writeConfig = 
this.pipeline.getCoordinator().getWriteClient().getConfig();
+      this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, conf);
       return this;
     }
 
@@ -260,6 +264,14 @@ public class TestWriteBase {
       return this;
     }
 
+    /**
+     * Stop the timeline server.
+     */
+    public TestHarness stopTimelineServer() {
+      
pipeline.getCoordinator().getWriteClient().getTimelineServer().ifPresent(EmbeddedTimelineService::stop);
+      return this;
+    }
+
     public TestHarness allDataFlushed() {
       Map<String, List<HoodieRecord>> dataBuffer = 
this.pipeline.getDataBuffer();
       assertThat("All data should be flushed out", dataBuffer.size(), is(0));
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index ccc9094e558..345070fbe5c 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.marker.MarkerOperation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
+import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
 import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
 import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
 import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
@@ -38,6 +39,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
 import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
+import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
 import org.apache.hudi.timeline.service.handlers.MarkerHandler;
 import org.apache.hudi.timeline.service.handlers.TimelineHandler;
 
@@ -77,6 +79,7 @@ public class RequestHandler {
   private final FileSliceHandler sliceHandler;
   private final BaseFileHandler dataFileHandler;
   private final MarkerHandler markerHandler;
+  private final InstantStateHandler instantStateHandler;
   private final Registry metricsRegistry = 
Registry.getRegistry("TimelineService");
   private ScheduledExecutorService asyncResultService = 
Executors.newSingleThreadScheduledExecutor();
 
@@ -95,6 +98,11 @@ public class RequestHandler {
     } else {
       this.markerHandler = null;
     }
+    if (timelineServiceConfig.enableInstantStateRequests) {
+      this.instantStateHandler = new InstantStateHandler(conf, 
timelineServiceConfig, fileSystem, viewManager);
+    } else {
+      this.instantStateHandler = null;
+    }
     if (timelineServiceConfig.async) {
       asyncResultService = Executors.newSingleThreadScheduledExecutor();
     }
@@ -139,6 +147,9 @@ public class RequestHandler {
     if (markerHandler != null) {
       registerMarkerAPI();
     }
+    if (instantStateHandler != null) {
+      registerInstantStateAPI();
+    }
   }
 
   public void stop() {
@@ -160,7 +171,7 @@ public class RequestHandler {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", 
TimelineHash=" + timelineHashFromClient
           + "], localTimeline=" + localTimeline.getInstants());
-    } 
+    }
 
     if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
         && 
HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
@@ -505,6 +516,24 @@ public class RequestHandler {
     }, false));
   }
 
+  private void registerInstantStateAPI() {
+    app.get(InstantStateHandler.ALL_INSTANT_STATE_URL, new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_INSTANT_STATE", 1);
+      List<InstantStateDTO> instantStates = 
instantStateHandler.getAllInstantStates(
+          ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
+      );
+      writeValueAsString(ctx, instantStates);
+    }, false));
+
+    app.post(InstantStateHandler.REFRESH_INSTANT_STATE, new ViewHandler(ctx -> 
{
+      metricsRegistry.add("REFRESH_INSTANT_STATE", 1);
+      boolean success = instantStateHandler.refresh(
+          ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
+      );
+      writeValueAsString(ctx, success);
+    }, false));
+  }
+
   /**
    * Determine whether to throw an exception when local view of table's 
timeline is behind that of client's view.
    */
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 171357f5341..790d476311f 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -114,6 +114,9 @@ public class TimelineService {
     @Parameter(names = {"--enable-marker-requests", "-em"}, description = 
"Enable handling of marker-related requests")
     public boolean enableMarkerRequests = false;
 
+    @Parameter(names = {"--enable-instant-state-requests"}, description = 
"Enable handling of instant state requests")
+    public boolean enableInstantStateRequests = false;
+
     @Parameter(names = {"--marker-batch-threads", "-mbt"}, description = 
"Number of threads to use for batch processing marker creation requests")
     public int markerBatchNumThreads = 20;
 
@@ -158,6 +161,10 @@ public class TimelineService {
             + "Instants whose heartbeat is greater than the current value will 
not be used in early conflict detection.")
     public Long maxAllowableHeartbeatIntervalInMs = 120000L;
 
+    @Parameter(names = {"--instant-state-force-refresh-request-number"}, 
description =
+        "Used for timeline-server-based instant state requests, every N read 
requests will trigger instant state refreshing")
+    public Integer instantStateForceRefreshRequestNumber = 100;
+
     @Parameter(names = {"--help", "-h"})
     public Boolean help = false;
 
@@ -179,6 +186,7 @@ public class TimelineService {
       private boolean async = false;
       private boolean compress = true;
       private boolean enableMarkerRequests = false;
+      private boolean enableInstantStateRequests = false;
       private int markerBatchNumThreads = 20;
       private long markerBatchIntervalMs = 50L;
       private int markerParallelism = 100;
@@ -189,6 +197,8 @@ public class TimelineService {
       private Long asyncConflictDetectorPeriodMs = 30000L;
       private Long maxAllowableHeartbeatIntervalInMs = 120000L;
 
+      private int instantStateForceRefreshRequestNumber = 100;
+
       public Builder() {
       }
 
@@ -287,6 +297,16 @@ public class TimelineService {
         return this;
       }
 
+      public Builder enableInstantStateRequests(boolean 
enableCkpInstantStateRequests) {
+        this.enableInstantStateRequests = enableCkpInstantStateRequests;
+        return this;
+      }
+
+      public Builder instantStateForceRefreshRequestNumber(int 
instantStateForceRefreshRequestNumber) {
+        this.instantStateForceRefreshRequestNumber = 
instantStateForceRefreshRequestNumber;
+        return this;
+      }
+
       public Config build() {
         Config config = new Config();
         config.serverPort = this.serverPort;
@@ -308,6 +328,8 @@ public class TimelineService {
         config.asyncConflictDetectorInitialDelayMs = 
this.asyncConflictDetectorInitialDelayMs;
         config.asyncConflictDetectorPeriodMs = 
this.asyncConflictDetectorPeriodMs;
         config.maxAllowableHeartbeatIntervalInMs = 
this.maxAllowableHeartbeatIntervalInMs;
+        config.enableInstantStateRequests = this.enableInstantStateRequests;
+        config.instantStateForceRefreshRequestNumber = 
this.instantStateForceRefreshRequestNumber;
         return config;
       }
     }
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/InstantStateHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/InstantStateHandler.java
new file mode 100644
index 00000000000..e7dc39acb97
--- /dev/null
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/InstantStateHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * REST Handler servicing instant state requests.
+ * <p>
+ * The instant states are cached in timeline server and will be refreshed 
after the instant states in file system were changed.
+ */
+public class InstantStateHandler extends Handler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(InstantStateHandler.class);
+
+  /**
+   * Base url for  instant state requests.
+   */
+  private static final String BASE_URL = "/v1/hoodie/instantstate";
+
+  /**
+   * Param for  instant state requests, which contains a uniqueId for 
different writers.
+   */
+  public static final String INSTANT_STATE_DIR_PATH_PARAM = 
"instantstatedirpath";
+
+  /**
+   * GET requests. Returns all the instant states under instant state path.
+   */
+  public static final String ALL_INSTANT_STATE_URL = String.format("%s/%s", 
BASE_URL, "all");
+
+  /**
+   * POST requests. Refresh the instant state data cached in memory.
+   */
+  public static final String REFRESH_INSTANT_STATE = String.format("%s/%s", 
BASE_URL, "refresh/");
+
+  /**
+   * Cached instant state data, instant state path -> list of instant states 
in fs.
+   */
+  private final ConcurrentHashMap<String, List<InstantStateDTO>> 
cachedInstantStates;
+
+  /**
+   * Number of requests after the last refresh.
+   */
+  private final AtomicLong requestCount;
+
+  public InstantStateHandler(Configuration conf, TimelineService.Config 
timelineServiceConfig, FileSystem fileSystem,
+                             FileSystemViewManager viewManager) throws 
IOException {
+    super(conf, timelineServiceConfig, fileSystem, viewManager);
+    this.cachedInstantStates = new ConcurrentHashMap<>();
+    this.requestCount = new AtomicLong();
+  }
+
+  /**
+   * Read instant states from cache of file system.
+   *
+   * @return Instant states under the input instant state path.
+   */
+  public List<InstantStateDTO> getAllInstantStates(String instantStatePath) {
+    if (requestCount.incrementAndGet() >= 
timelineServiceConfig.instantStateForceRefreshRequestNumber) {
+      // Do refresh for every N requests to ensure the writers won't be 
blocked forever
+      refresh(instantStatePath);
+    }
+    return cachedInstantStates.computeIfAbsent(instantStatePath, k -> 
scanInstantState(new Path(k)));
+  }
+
+  /**
+   * Refresh the checkpoint messages cached. Will be called when coordinator 
start/commit/abort instant.
+   *
+   * @return Whether refreshing is successful.
+   */
+  public boolean refresh(String instantStatePath) {
+    try {
+      cachedInstantStates.put(instantStatePath, scanInstantState(new 
Path(instantStatePath)));
+      requestCount.set(0);
+    } catch (Exception e) {
+      LOG.error("Failed to load instant states, path: " + instantStatePath, e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Scan the instant states from file system.
+   */
+  public List<InstantStateDTO> scanInstantState(Path instantStatePath) {
+    try {
+      // Check instantStatePath exists before list status, see HUDI-5915
+      if (this.fileSystem.exists(instantStatePath)) {
+        return 
Arrays.stream(this.fileSystem.listStatus(instantStatePath)).map(InstantStateDTO::fromFileStatus).collect(Collectors.toList());
+      } else {
+        return Collections.emptyList();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to load instant states, path: " + 
instantStatePath, e);
+    }
+  }
+
+}

Reply via email to