This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ce58d02aa36 [HUDI-6336] Support flink timeline-based ckp metadata
(#9651)
ce58d02aa36 is described below
commit ce58d02aa367cc0a030d70e2225bb3cc191d2820
Author: StreamingFlames <[email protected]>
AuthorDate: Thu Sep 28 20:32:32 2023 -0500
[HUDI-6336] Support flink timeline-based ckp metadata (#9651)
---
.../client/embedded/EmbeddedTimelineService.java | 6 +
.../org/apache/hudi/config/HoodieWriteConfig.java | 25 ++++
.../marker/TimelineServerBasedWriteMarkers.java | 56 ++-------
.../org/apache/hudi/util/HttpRequestClient.java | 102 ++++++++++++++++
.../common/table/timeline/dto/InstantStateDTO.java | 68 +++++++++++
.../hudi/sink/StreamWriteOperatorCoordinator.java | 8 +-
.../hudi/sink/bootstrap/BootstrapOperator.java | 3 +-
.../hudi/sink/bulk/BulkInsertWriteFunction.java | 3 +-
.../sink/common/AbstractStreamWriteFunction.java | 3 +-
.../org/apache/hudi/sink/meta/CkpMetadata.java | 33 ++----
.../apache/hudi/sink/meta/CkpMetadataFactory.java | 43 +++++++
.../hudi/sink/meta/TimelineBasedCkpMetadata.java | 112 ++++++++++++++++++
.../TestWriteWithTimelineBasedCkpMetadata.java | 63 ++++++++++
.../org/apache/hudi/sink/meta/TestCkpMetadata.java | 34 ++++--
.../sink/meta/TestTimelineBasedCkpMetadata.java | 117 ++++++++++++++++++
.../org/apache/hudi/sink/utils/TestWriteBase.java | 14 ++-
.../hudi/timeline/service/RequestHandler.java | 31 ++++-
.../hudi/timeline/service/TimelineService.java | 22 ++++
.../service/handlers/InstantStateHandler.java | 131 +++++++++++++++++++++
19 files changed, 789 insertions(+), 85 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 7d794366ba0..feda7d2b185 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -100,6 +100,12 @@ public class EmbeddedTimelineService {
* writeConfig.getHoodieClientHeartbeatTolerableMisses());
}
+ if (writeConfig.isTimelineServerBasedInstantStateEnabled()) {
+ timelineServiceConfBuilder
+
.instantStateForceRefreshRequestNumber(writeConfig.getTimelineServerBasedInstantStateForceRefreshRequestNumber())
+ .enableInstantStateRequests(true);
+ }
+
server = new TimelineService(context, hadoopConf.newCopy(),
timelineServiceConfBuilder.build(),
FSUtils.getFs(basePath, hadoopConf.newCopy()), viewManager);
serverPort = server.startService();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 367f9388726..87734c042d3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -428,6 +428,18 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation(MarkerType.class);
+ public static final ConfigProperty<Boolean>
INSTANT_STATE_TIMELINE_SERVER_BASED = ConfigProperty
+ .key("hoodie.instant_state.timeline_server_based.enabled")
+ .defaultValue(false)
+ .sinceVersion("1.0.0")
+ .withDocumentation("If enabled, writers get instant state from timeline
server rather than requesting DFS directly");
+
+ public static final ConfigProperty<Integer>
INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER =
ConfigProperty
+
.key("hoodie.instant_state.timeline_server_based.force_refresh.request.number")
+ .defaultValue(100)
+ .sinceVersion("1.0.0")
+ .withDocumentation("Number of requests to trigger instant state cache
refreshing");
+
public static final ConfigProperty<Integer>
MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty
.key("hoodie.markers.timeline_server_based.batch.num_threads")
.defaultValue(20)
@@ -1355,6 +1367,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(FINALIZE_WRITE_PARALLELISM_VALUE);
}
+ public boolean isTimelineServerBasedInstantStateEnabled() {
+ return getBoolean(INSTANT_STATE_TIMELINE_SERVER_BASED);
+ }
+
+ public int getTimelineServerBasedInstantStateForceRefreshRequestNumber() {
+ return
getInt(INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER);
+ }
+
public MarkerType getMarkersType() {
String markerType = getString(MARKERS_TYPE);
return MarkerType.valueOf(markerType.toUpperCase());
@@ -2930,6 +2950,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withTimelineServerBasedInstantStateEnable(boolean enable) {
+ writeConfig.setValue(INSTANT_STATE_TIMELINE_SERVER_BASED,
String.valueOf(enable));
+ return this;
+ }
+
public Builder withBulkInsertSortMode(String mode) {
writeConfig.setValue(BULK_INSERT_SORT_MODE, mode);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
index 9d6b7f9b9a9..a93d4fc91fb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
@@ -29,13 +29,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.HttpRequestClient;
+import org.apache.hudi.util.HttpRequestClient.RequestMethod;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.Path;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,10 +62,8 @@ import static
org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PA
*/
public class TimelineServerBasedWriteMarkers extends WriteMarkers {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineServerBasedWriteMarkers.class);
- private final ObjectMapper mapper;
- private final String timelineServerHost;
- private final int timelineServerPort;
- private final int timeoutSecs;
+
+ private final HttpRequestClient httpRequestClient;
public TimelineServerBasedWriteMarkers(HoodieTable table, String
instantTime) {
this(table.getMetaClient().getBasePath(),
@@ -80,17 +76,14 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
TimelineServerBasedWriteMarkers(String basePath, String markerFolderPath,
String instantTime,
String timelineServerHost, int
timelineServerPort, int timeoutSecs) {
super(basePath, markerFolderPath, instantTime);
- this.mapper = new ObjectMapper();
- this.timelineServerHost = timelineServerHost;
- this.timelineServerPort = timelineServerPort;
- this.timeoutSecs = timeoutSecs;
+ this.httpRequestClient = new HttpRequestClient(timelineServerHost,
timelineServerPort, timeoutSecs, 0);
}
@Override
public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism)
{
Map<String, String> paramsMap =
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
- return executeRequestToTimelineServer(
+ return httpRequestClient.executeRequest(
DELETE_MARKER_DIR_URL, paramsMap, new TypeReference<Boolean>() {},
RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to delete marker directory " +
markerDirPath.toString(), e);
@@ -101,7 +94,7 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
public boolean doesMarkerDirExist() {
Map<String, String> paramsMap =
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
- return executeRequestToTimelineServer(
+ return httpRequestClient.executeRequest(
MARKERS_DIR_EXISTS_URL, paramsMap, new TypeReference<Boolean>() {},
RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to check marker directory " +
markerDirPath.toString(), e);
@@ -112,7 +105,7 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
public Set<String> createdAndMergedDataPaths(HoodieEngineContext context,
int parallelism) throws IOException {
Map<String, String> paramsMap =
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
- Set<String> markerPaths = executeRequestToTimelineServer(
+ Set<String> markerPaths = httpRequestClient.executeRequest(
CREATE_AND_MERGE_MARKERS_URL, paramsMap, new
TypeReference<Set<String>>() {}, RequestMethod.GET);
return
markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet());
} catch (IOException e) {
@@ -125,7 +118,7 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
public Set<String> allMarkerFilePaths() {
Map<String, String> paramsMap =
Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
- return executeRequestToTimelineServer(
+ return httpRequestClient.executeRequest(
ALL_MARKERS_URL, paramsMap, new TypeReference<Set<String>>() {},
RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get all markers in " +
markerDirPath.toString(), e);
@@ -179,9 +172,9 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
private boolean executeCreateMarkerRequest(Map<String, String> paramsMap,
String partitionPath, String markerFileName) {
boolean success;
try {
- success = executeRequestToTimelineServer(
+ success = httpRequestClient.executeRequest(
CREATE_MARKER_URL, paramsMap, new TypeReference<Boolean>() {
- }, RequestMethod.POST);
+ }, HttpRequestClient.RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to create marker file " +
partitionPath + "/" + markerFileName, e);
}
@@ -212,31 +205,4 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
return paramsMap;
}
- private <T> T executeRequestToTimelineServer(String requestPath, Map<String,
String> queryParameters,
- TypeReference reference,
RequestMethod method) throws IOException {
- URIBuilder builder =
- new
URIBuilder().setHost(timelineServerHost).setPort(timelineServerPort).setPath(requestPath).setScheme("http");
-
- queryParameters.forEach(builder::addParameter);
-
- String url = builder.toString();
- LOG.debug("Sending request : (" + url + ")");
- Response response;
- int timeout = this.timeoutSecs * 1000; // msec
- switch (method) {
- case GET:
- response =
Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
- break;
- case POST:
- default:
- response =
Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
- break;
- }
- String content = response.returnContent().asString();
- return (T) mapper.readValue(content, reference);
- }
-
- private enum RequestMethod {
- GET, POST
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java
new file mode 100644
index 00000000000..65131cc7742
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Helper class for executing timeline server requests.
+ */
+public class HttpRequestClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(HttpRequestClient.class);
+ private final ObjectMapper mapper;
+ private final String serverHost;
+ private final int serverPort;
+ private final int timeoutSecs;
+ private final int maxRetry;
+
+ public HttpRequestClient(HoodieWriteConfig writeConfig) {
+ this(writeConfig.getViewStorageConfig().getRemoteViewServerHost(),
+ writeConfig.getViewStorageConfig().getRemoteViewServerPort(),
+
writeConfig.getViewStorageConfig().getRemoteTimelineClientTimeoutSecs(),
+
writeConfig.getViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers());
+ }
+
+ public HttpRequestClient(String serverHost, int serverPort, int timeoutSecs,
int maxRetry) {
+ this.mapper = new ObjectMapper();
+ this.serverHost = serverHost;
+ this.serverPort = serverPort;
+ this.timeoutSecs = timeoutSecs;
+ this.maxRetry = maxRetry;
+ }
+
+ public <T> T executeRequestWithRetry(String requestPath, Map<String, String>
queryParameters,
+ TypeReference reference, RequestMethod
method) {
+ int retry = maxRetry;
+ while (--retry >= 0) {
+ try {
+ return executeRequest(requestPath, queryParameters, reference, method);
+ } catch (IOException e) {
+ LOG.warn("Failed to execute request (" + requestPath + ") to timeline
server", e);
+ }
+ }
+ throw new HoodieException("Failed to execute timeline server request (" +
requestPath + ")");
+ }
+
+ public <T> T executeRequest(String requestPath, Map<String, String>
queryParameters,
+ TypeReference reference, RequestMethod method)
throws IOException {
+ URIBuilder builder =
+ new
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme("http");
+
+ queryParameters.forEach(builder::addParameter);
+
+ String url = builder.toString();
+ LOG.debug("Sending request : (" + url + ")");
+ Response response;
+ int timeout = this.timeoutSecs * 1000; // msec
+ switch (method) {
+ case GET:
+ response =
Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+ break;
+ case POST:
+ default:
+ response =
Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+ break;
+ }
+ String content = response.returnContent().asString();
+ return (T) mapper.readValue(content, reference);
+ }
+
+ public enum RequestMethod {
+ GET, POST
+ }
+
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java
new file mode 100644
index 00000000000..832f4aa4b8e
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.dto;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Data transfer object for instant state.
+ *
+ * see org.apache.hudi.sink.meta.CkpMessage.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class InstantStateDTO {
+
+ /**
+ * The instant time.
+ *
+ * @see org.apache.hudi.sink.meta.CkpMessage#instant
+ */
+ @JsonProperty("instant")
+ String instant;
+
+ /**
+ * The instant state.
+ *
+ * @see org.apache.hudi.sink.meta.CkpMessage#state
+ */
+ @JsonProperty("state")
+ String state;
+
+ public static InstantStateDTO fromFileStatus(FileStatus fileStatus) {
+ InstantStateDTO ret = new InstantStateDTO();
+ String fileName = fileStatus.getPath().getName();
+ String[] nameAndExt = fileName.split("\\.");
+ ValidationUtils.checkState(nameAndExt.length == 2);
+ ret.instant = nameAndExt[0];
+ ret.state = nameAndExt[1];
+ return ret;
+ }
+
+ public String getInstant() {
+ return instant;
+ }
+
+ public String getState() {
+ return state;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 34d8322dd9d..c580d43c351 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
@@ -37,6 +38,7 @@ import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.ClientIds;
@@ -186,9 +188,9 @@ public class StreamWriteOperatorCoordinator
this.gateways = new SubtaskGateway[this.parallelism];
// init table, create if not exists.
this.metaClient = initTableIfNotExists(this.conf);
- this.ckpMetadata = initCkpMetadata(this.metaClient, this.conf);
// the write client must create after the table creation
this.writeClient = FlinkWriteClients.createWriteClient(conf);
+ this.ckpMetadata = initCkpMetadata(writeClient.getConfig(), this.conf);
initMetadataTable(this.writeClient);
this.tableState = TableState.create(conf);
// start the executor
@@ -349,8 +351,8 @@ public class StreamWriteOperatorCoordinator
writeClient.initMetadataTable();
}
- private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient,
Configuration conf) throws IOException {
- CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient,
conf.getString(FlinkOptions.WRITE_CLIENT_ID));
+ private static CkpMetadata initCkpMetadata(HoodieWriteConfig writeConfig,
Configuration conf) throws IOException {
+ CkpMetadata ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig,
conf);
ckpMetadata.bootstrap();
return ckpMetadata;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 1bdfeb7296b..1c6c7fca650 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -40,6 +40,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.FlinkTables;
@@ -131,7 +132,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf,
true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf,
getRuntimeContext());
- this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient(),
this.conf.getString(FlinkOptions.WRITE_CLIENT_ID));
+ this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, conf);
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
preLoadIndexRecords();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index ec4e56c2d1d..d44ef25ee4b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.FlinkWriteClients;
@@ -113,7 +114,7 @@ public class BulkInsertWriteFunction<I>
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = FlinkWriteClients.createWriteClient(this.config,
getRuntimeContext());
- this.ckpMetadata = CkpMetadata.getInstance(config);
+ this.ckpMetadata =
CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), config);
this.initInstant = lastPendingInstant();
sendBootstrapEvent();
initWriterHelper();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 3bd19fa0699..e0bb71b90c7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -28,6 +28,7 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
@@ -148,7 +149,7 @@ public abstract class AbstractStreamWriteFunction<I>
TypeInformation.of(WriteMetadataEvent.class)
));
- this.ckpMetadata = CkpMetadata.getInstance(this.metaClient,
this.config.getString(FlinkOptions.WRITE_CLIENT_ID));
+ this.ckpMetadata =
CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), config);
this.currentInstant = lastPendingInstant();
if (context.isRestored()) {
restoreWriteMetadata();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 9b0457845e9..90636bf6ac5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -18,16 +18,12 @@
package org.apache.hudi.sink.meta;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
-import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -41,6 +37,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* The checkpoint metadata for bookkeeping the checkpoint messages.
@@ -76,12 +73,7 @@ public class CkpMetadata implements Serializable,
AutoCloseable {
private List<CkpMessage> messages;
private List<String> instantCache;
- private CkpMetadata(Configuration config) {
- this(FSUtils.getFs(config.getString(FlinkOptions.PATH),
HadoopConfigurations.getHadoopConf(config)),
- config.getString(FlinkOptions.PATH),
config.getString(FlinkOptions.WRITE_CLIENT_ID));
- }
-
- private CkpMetadata(FileSystem fs, String basePath, String uniqueId) {
+ CkpMetadata(FileSystem fs, String basePath, String uniqueId) {
this.fs = fs;
this.path = new Path(ckpMetaPath(basePath, uniqueId));
}
@@ -211,17 +203,6 @@ public class CkpMetadata implements Serializable,
AutoCloseable {
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
- public static CkpMetadata getInstance(Configuration config) {
- return new CkpMetadata(config);
- }
-
- public static CkpMetadata getInstance(HoodieTableMetaClient metaClient,
String uniqueId) {
- return new CkpMetadata(metaClient.getFs(), metaClient.getBasePath(),
uniqueId);
- }
-
- public static CkpMetadata getInstance(FileSystem fs, String basePath, String
uniqueId) {
- return new CkpMetadata(fs, basePath, uniqueId);
- }
protected static String ckpMetaPath(String basePath, String uniqueId) {
// .hoodie/.aux/ckp_meta
@@ -233,12 +214,16 @@ public class CkpMetadata implements Serializable,
AutoCloseable {
return new Path(path, fileName);
}
- private List<CkpMessage> scanCkpMetadata(Path ckpMetaPath) throws
IOException {
+ protected Stream<CkpMessage> fetchCkpMessages(Path ckpMetaPath) throws
IOException {
// This is required when the storage is minio
if (!this.fs.exists(ckpMetaPath)) {
- return new ArrayList<>();
+ return Stream.empty();
}
- return Arrays.stream(this.fs.listStatus(ckpMetaPath)).map(CkpMessage::new)
+ return Arrays.stream(this.fs.listStatus(ckpMetaPath)).map(CkpMessage::new);
+ }
+
+ protected List<CkpMessage> scanCkpMetadata(Path ckpMetaPath) throws
IOException {
+ return fetchCkpMessages(ckpMetaPath)
.collect(Collectors.groupingBy(CkpMessage::getInstant)).values().stream()
.map(messages -> messages.stream().reduce((x, y) -> {
// Pick the one with the highest state
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadataFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadataFactory.java
new file mode 100644
index 00000000000..852f8a1b260
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadataFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.meta;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * A factory to generate {@link CkpMetadata} instance based on whether {@link
HoodieWriteConfig#INSTANT_STATE_TIMELINE_SERVER_BASED} enabled.
+ */
+public class CkpMetadataFactory {
+ public static CkpMetadata getCkpMetadata(HoodieWriteConfig writeConfig,
Configuration conf) {
+ FileSystem fs = FSUtils.getFs(conf.getString(FlinkOptions.PATH),
HadoopConfigurations.getHadoopConf(conf));
+ String basePath = conf.getString(FlinkOptions.PATH);
+ String uniqueId = conf.getString(FlinkOptions.WRITE_CLIENT_ID);
+ if (writeConfig.isEmbeddedTimelineServerEnabled() &&
writeConfig.isTimelineServerBasedInstantStateEnabled()) {
+ return new TimelineBasedCkpMetadata(fs, basePath, uniqueId, writeConfig);
+ } else {
+ return new CkpMetadata(fs, basePath, uniqueId);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/TimelineBasedCkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/TimelineBasedCkpMetadata.java
new file mode 100644
index 00000000000..44229f36d44
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/TimelineBasedCkpMetadata.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.meta;
+
+import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
+import org.apache.hudi.util.HttpRequestClient;
+import org.apache.hudi.util.HttpRequestClient.RequestMethod;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+/**
+ * Timeline server based CkpMetadata, will read ckpMessages from
timeline-server instead of from file system directly.
+ */
+public class TimelineBasedCkpMetadata extends CkpMetadata {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TimelineBasedCkpMetadata.class);
+
+ private final HttpRequestClient httpRequestClient;
+
+ public TimelineBasedCkpMetadata(FileSystem fs, String basePath, String
uniqueId, HoodieWriteConfig writeConfig) {
+ super(fs, basePath, uniqueId);
+ this.httpRequestClient = new HttpRequestClient(writeConfig);
+ LOG.info("Timeline server based CkpMetadata enabled");
+ }
+
+ @Override
+ public void startInstant(String instant) {
+ super.startInstant(instant);
+ sendRefreshRequest();
+ }
+
+ @Override
+ public void commitInstant(String instant) {
+ super.commitInstant(instant);
+ sendRefreshRequest();
+ }
+
+ @Override
+ public void abortInstant(String instant) {
+ super.abortInstant(instant);
+ sendRefreshRequest();
+ }
+
+ @Override
+ protected Stream<CkpMessage> fetchCkpMessages(Path ckpMetaPath) throws
IOException {
+ // Read ckp messages from timeline server
+ Stream<CkpMessage> ckpMessageStream;
+ try {
+ List<InstantStateDTO> instantStateDTOList =
httpRequestClient.executeRequestWithRetry(
+ InstantStateHandler.ALL_INSTANT_STATE_URL,
getRequestParams(ckpMetaPath.toString()),
+ new TypeReference<List<InstantStateDTO>>() {
+ }, RequestMethod.GET);
+ ckpMessageStream = instantStateDTOList.stream().map(c -> new
CkpMessage(c.getInstant(), c.getState()));
+ } catch (Exception e) {
+ LOG.error("Failed to execute scan ckp metadata, fall back to read from
file system...", e);
+ // If we failed to request timeline server, read ckp messages from file
system directly.
+ ckpMessageStream = super.fetchCkpMessages(ckpMetaPath);
+ }
+ return ckpMessageStream;
+ }
+
+ private Map<String, String> getRequestParams(String dirPath) {
+ return
Collections.singletonMap(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM,
dirPath);
+ }
+
+ /**
+ * Refresh the ckp messages that cached in timeline server.
+ */
+ private void sendRefreshRequest() {
+ try {
+ boolean success = httpRequestClient.executeRequestWithRetry(
+ InstantStateHandler.REFRESH_INSTANT_STATE,
getRequestParams(path.toString()),
+ new TypeReference<Boolean>() {
+ }, RequestMethod.POST);
+ if (!success) {
+ LOG.warn("Timeline server responses with failed refresh");
+ }
+ } catch (Exception e) {
+ // Do not propagate the exception because the server will also do auto
refresh
+ LOG.error("Failed to execute refresh", e);
+ }
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteWithTimelineBasedCkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteWithTimelineBasedCkpMetadata.java
new file mode 100644
index 00000000000..12c67cfc7b8
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteWithTimelineBasedCkpMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test cases for timeline based checkpoint metadata.
+ */
+public class TestWriteWithTimelineBasedCkpMetadata extends
TestWriteCopyOnWrite {
+
+ @Override
+ protected void setUp(Configuration conf) {
+
conf.setBoolean(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(),
true);
+ }
+
+ @Test
+ public void testTimelineBasedCkpMetadataFailover() throws Exception {
+ // reset the config option
+ conf.setString(FlinkOptions.OPERATION, "insert");
+ conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+ conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+ conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
+
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(1)
+ // stop the timeline server by close write client
+ .stopTimelineServer()
+ .handleEvents(1)
+ .checkpointComplete(1)
+ // will still be able to write and checkpoint
+ .checkWrittenData(EXPECTED4, 1)
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(2)
+ .handleEvents(1)
+ .checkpointComplete(2)
+ .checkWrittenDataCOW(EXPECTED5)
+ .end();
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index 1ef2254ff8e..a36810d1301 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -18,13 +18,14 @@
package org.apache.hudi.sink.meta;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -32,6 +33,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.stream.IntStream;
@@ -49,11 +51,20 @@ public class TestCkpMetadata {
@TempDir
File tempFile;
+ protected Configuration conf;
+
+ protected HoodieFlinkWriteClient writeClient;
+
@BeforeEach
public void beforeEach() throws Exception {
+ setup();
+ }
+
+ protected void setup() throws IOException {
String basePath = tempFile.getAbsolutePath();
- Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ this.conf = TestConfigurations.getDefaultConf(basePath);
StreamerUtil.initTableIfNotExists(conf);
+ this.writeClient = FlinkWriteClients.createWriteClient(conf);
}
@ParameterizedTest
@@ -94,9 +105,16 @@ public class TestCkpMetadata {
metadata1.getInstantCache(), is(Collections.singletonList("4")));
}
- private CkpMetadata getCkpMetadata(String uniqueId) {
- String basePath = tempFile.getAbsolutePath();
- FileSystem fs = FSUtils.getFs(basePath,
HadoopConfigurations.getHadoopConf(new Configuration()));
- return CkpMetadata.getInstance(fs, basePath, uniqueId);
+ protected CkpMetadata getCkpMetadata(String uniqueId) {
+ conf.set(FlinkOptions.WRITE_CLIENT_ID, uniqueId);
+ return CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), conf);
+ }
+
+ @AfterEach
+ public void cleanup() {
+ if (writeClient != null) {
+ writeClient.close();
+ writeClient = null;
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestTimelineBasedCkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestTimelineBasedCkpMetadata.java
new file mode 100644
index 00000000000..3083e863720
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestTimelineBasedCkpMetadata.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.meta;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link CkpMetadata} when timeline based enabled.
+ */
+public class TestTimelineBasedCkpMetadata extends TestCkpMetadata {
+
+ @Override
+ public void setup() throws IOException {
+ String basePath = tempFile.getAbsolutePath();
+ this.conf = TestConfigurations.getDefaultConf(basePath);
+
conf.setString(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(),
"true");
+ StreamerUtil.initTableIfNotExists(conf);
+ this.writeClient = FlinkWriteClients.createWriteClient(conf);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "1"})
+ public void testFailOver(String uniqueId) {
+ CkpMetadata metadata = getCkpMetadata(uniqueId);
+ // write and read 5 committed checkpoints
+ IntStream.range(0, 3).forEach(i -> metadata.startInstant(i + ""));
+
+ assertThat(metadata.lastPendingInstant(), is("2"));
+ metadata.commitInstant("2");
+ assertThat(metadata.lastPendingInstant(), equalTo(null));
+
+ // Close write client and timeline server
+ cleanup();
+
+ // When timeline server is not responding, we can still read ckp metadata
from file system directly
+ // test cleaning
+ IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));
+ assertThat(metadata.getMessages().size(), is(3));
+ // commit and abort instant does not trigger cleaning
+ metadata.commitInstant("6");
+ metadata.abortInstant("7");
+ assertThat(metadata.getMessages().size(), is(5));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "1"})
+ public void testRefreshEveryNCommits(String uniqueId) {
+ // File system based writing
+
writeClient.getConfig().setValue(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(),
"false");
+ CkpMetadata writeMetadata = getCkpMetadata(uniqueId);
+
+ // Timeline-based reading
+
writeClient.getConfig().setValue(HoodieWriteConfig.INSTANT_STATE_TIMELINE_SERVER_BASED.key(),
"true");
+ CkpMetadata readOnlyMetadata = getCkpMetadata(uniqueId);
+
+ // write and read 5 committed checkpoints
+ IntStream.range(0, 3).forEach(i -> writeMetadata.startInstant(i + ""));
+ assertThat(readOnlyMetadata.lastPendingInstant(), is("2"));
+ writeMetadata.commitInstant("2");
+ // Send 10 requests to server
+ readCkpMessagesNTimes(readOnlyMetadata, 10);
+ assertThat(readOnlyMetadata.lastPendingInstant(), equalTo("2"));
+ // Send 100 requests to server, will trigger refresh
+ readCkpMessagesNTimes(readOnlyMetadata, 100);
+ assertThat(readOnlyMetadata.lastPendingInstant(), equalTo(null));
+
+ IntStream.range(3, 6).forEach(i -> writeMetadata.startInstant(i + ""));
+ readCkpMessagesNTimes(readOnlyMetadata, 200);
+ assertThat(readOnlyMetadata.getMessages().size(), is(3));
+
+ writeMetadata.commitInstant("6");
+ writeMetadata.abortInstant("7");
+ readCkpMessagesNTimes(readOnlyMetadata, 200);
+ assertThat(readOnlyMetadata.getMessages().size(), is(5));
+ }
+
+ /**
+ * Send read requests to server to trigger checkpoint metadata refreshing.
+ */
+ private void readCkpMessagesNTimes(CkpMetadata metadata, int maxRetry) {
+ int retry = 0;
+ while (retry < maxRetry) {
+ metadata.getMessages();
+ retry++;
+ }
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b2d6546e1c1..7332a0ae61a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sink.utils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -27,10 +28,12 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
@@ -144,7 +147,8 @@ public class TestWriteBase {
this.pipeline = TestData.getWritePipeline(this.basePath, conf);
// open the function and ingest data
this.pipeline.openFunction();
- this.ckpMetadata = CkpMetadata.getInstance(conf);
+ HoodieWriteConfig writeConfig =
this.pipeline.getCoordinator().getWriteClient().getConfig();
+ this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, conf);
return this;
}
@@ -260,6 +264,14 @@ public class TestWriteBase {
return this;
}
+ /**
+ * Stop the timeline server.
+ */
+ public TestHarness stopTimelineServer() {
+
pipeline.getCoordinator().getWriteClient().getTimelineServer().ifPresent(EmbeddedTimelineService::stop);
+ return this;
+ }
+
public TestHarness allDataFlushed() {
Map<String, List<HoodieRecord>> dataBuffer =
this.pipeline.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index ccc9094e558..345070fbe5c 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.marker.MarkerOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
+import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
@@ -38,6 +39,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
+import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
import org.apache.hudi.timeline.service.handlers.TimelineHandler;
@@ -77,6 +79,7 @@ public class RequestHandler {
private final FileSliceHandler sliceHandler;
private final BaseFileHandler dataFileHandler;
private final MarkerHandler markerHandler;
+ private final InstantStateHandler instantStateHandler;
private final Registry metricsRegistry =
Registry.getRegistry("TimelineService");
private ScheduledExecutorService asyncResultService =
Executors.newSingleThreadScheduledExecutor();
@@ -95,6 +98,11 @@ public class RequestHandler {
} else {
this.markerHandler = null;
}
+ if (timelineServiceConfig.enableInstantStateRequests) {
+ this.instantStateHandler = new InstantStateHandler(conf,
timelineServiceConfig, fileSystem, viewManager);
+ } else {
+ this.instantStateHandler = null;
+ }
if (timelineServiceConfig.async) {
asyncResultService = Executors.newSingleThreadScheduledExecutor();
}
@@ -139,6 +147,9 @@ public class RequestHandler {
if (markerHandler != null) {
registerMarkerAPI();
}
+ if (instantStateHandler != null) {
+ registerInstantStateAPI();
+ }
}
public void stop() {
@@ -160,7 +171,7 @@ public class RequestHandler {
if (LOG.isDebugEnabled()) {
LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ",
TimelineHash=" + timelineHashFromClient
+ "], localTimeline=" + localTimeline.getInstants());
- }
+ }
if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
&&
HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
@@ -505,6 +516,24 @@ public class RequestHandler {
}, false));
}
+ private void registerInstantStateAPI() {
+ app.get(InstantStateHandler.ALL_INSTANT_STATE_URL, new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_INSTANT_STATE", 1);
+ List<InstantStateDTO> instantStates =
instantStateHandler.getAllInstantStates(
+ ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
+ );
+ writeValueAsString(ctx, instantStates);
+ }, false));
+
+ app.post(InstantStateHandler.REFRESH_INSTANT_STATE, new ViewHandler(ctx ->
{
+ metricsRegistry.add("REFRESH_INSTANT_STATE", 1);
+ boolean success = instantStateHandler.refresh(
+ ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
+ );
+ writeValueAsString(ctx, success);
+ }, false));
+ }
+
/**
* Determine whether to throw an exception when local view of table's
timeline is behind that of client's view.
*/
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 171357f5341..790d476311f 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -114,6 +114,9 @@ public class TimelineService {
@Parameter(names = {"--enable-marker-requests", "-em"}, description =
"Enable handling of marker-related requests")
public boolean enableMarkerRequests = false;
+ @Parameter(names = {"--enable-instant-state-requests"}, description =
"Enable handling of instant state requests")
+ public boolean enableInstantStateRequests = false;
+
@Parameter(names = {"--marker-batch-threads", "-mbt"}, description =
"Number of threads to use for batch processing marker creation requests")
public int markerBatchNumThreads = 20;
@@ -158,6 +161,10 @@ public class TimelineService {
+ "Instants whose heartbeat is greater than the current value will
not be used in early conflict detection.")
public Long maxAllowableHeartbeatIntervalInMs = 120000L;
+ @Parameter(names = {"--instant-state-force-refresh-request-number"},
description =
+ "Used for timeline-server-based instant state requests, every N read
requests will trigger instant state refreshing")
+ public Integer instantStateForceRefreshRequestNumber = 100;
+
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
@@ -179,6 +186,7 @@ public class TimelineService {
private boolean async = false;
private boolean compress = true;
private boolean enableMarkerRequests = false;
+ private boolean enableInstantStateRequests = false;
private int markerBatchNumThreads = 20;
private long markerBatchIntervalMs = 50L;
private int markerParallelism = 100;
@@ -189,6 +197,8 @@ public class TimelineService {
private Long asyncConflictDetectorPeriodMs = 30000L;
private Long maxAllowableHeartbeatIntervalInMs = 120000L;
+ private int instantStateForceRefreshRequestNumber = 100;
+
public Builder() {
}
@@ -287,6 +297,16 @@ public class TimelineService {
return this;
}
+ public Builder enableInstantStateRequests(boolean
enableCkpInstantStateRequests) {
+ this.enableInstantStateRequests = enableCkpInstantStateRequests;
+ return this;
+ }
+
+ public Builder instantStateForceRefreshRequestNumber(int
instantStateForceRefreshRequestNumber) {
+ this.instantStateForceRefreshRequestNumber =
instantStateForceRefreshRequestNumber;
+ return this;
+ }
+
public Config build() {
Config config = new Config();
config.serverPort = this.serverPort;
@@ -308,6 +328,8 @@ public class TimelineService {
config.asyncConflictDetectorInitialDelayMs =
this.asyncConflictDetectorInitialDelayMs;
config.asyncConflictDetectorPeriodMs =
this.asyncConflictDetectorPeriodMs;
config.maxAllowableHeartbeatIntervalInMs =
this.maxAllowableHeartbeatIntervalInMs;
+ config.enableInstantStateRequests = this.enableInstantStateRequests;
+ config.instantStateForceRefreshRequestNumber =
this.instantStateForceRefreshRequestNumber;
return config;
}
}
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/InstantStateHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/InstantStateHandler.java
new file mode 100644
index 00000000000..e7dc39acb97
--- /dev/null
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/InstantStateHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * REST Handler servicing instant state requests.
+ * <p>
+ * The instant states are cached in timeline server and will be refreshed
after the instant states in file system were changed.
+ */
+public class InstantStateHandler extends Handler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(InstantStateHandler.class);
+
+ /**
+ * Base url for instant state requests.
+ */
+ private static final String BASE_URL = "/v1/hoodie/instantstate";
+
+ /**
+ * Param for instant state requests, which contains a uniqueId for
different writers.
+ */
+ public static final String INSTANT_STATE_DIR_PATH_PARAM =
"instantstatedirpath";
+
+ /**
+ * GET requests. Returns all the instant states under instant state path.
+ */
+ public static final String ALL_INSTANT_STATE_URL = String.format("%s/%s",
BASE_URL, "all");
+
+ /**
+ * POST requests. Refresh the instant state data cached in memory.
+ */
+ public static final String REFRESH_INSTANT_STATE = String.format("%s/%s",
BASE_URL, "refresh/");
+
+ /**
+ * Cached instant state data, instant state path -> list of instant states
in fs.
+ */
+ private final ConcurrentHashMap<String, List<InstantStateDTO>>
cachedInstantStates;
+
+ /**
+ * Number of requests after the last refresh.
+ */
+ private final AtomicLong requestCount;
+
+ public InstantStateHandler(Configuration conf, TimelineService.Config
timelineServiceConfig, FileSystem fileSystem,
+ FileSystemViewManager viewManager) throws
IOException {
+ super(conf, timelineServiceConfig, fileSystem, viewManager);
+ this.cachedInstantStates = new ConcurrentHashMap<>();
+ this.requestCount = new AtomicLong();
+ }
+
+ /**
+ * Read instant states from cache of file system.
+ *
+ * @return Instant states under the input instant state path.
+ */
+ public List<InstantStateDTO> getAllInstantStates(String instantStatePath) {
+ if (requestCount.incrementAndGet() >=
timelineServiceConfig.instantStateForceRefreshRequestNumber) {
+ // Do refresh for every N requests to ensure the writers won't be
blocked forever
+ refresh(instantStatePath);
+ }
+ return cachedInstantStates.computeIfAbsent(instantStatePath, k ->
scanInstantState(new Path(k)));
+ }
+
+ /**
+ * Refresh the checkpoint messages cached. Will be called when coordinator
start/commit/abort instant.
+ *
+ * @return Whether refreshing is successful.
+ */
+ public boolean refresh(String instantStatePath) {
+ try {
+ cachedInstantStates.put(instantStatePath, scanInstantState(new
Path(instantStatePath)));
+ requestCount.set(0);
+ } catch (Exception e) {
+ LOG.error("Failed to load instant states, path: " + instantStatePath, e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Scan the instant states from file system.
+ */
+ public List<InstantStateDTO> scanInstantState(Path instantStatePath) {
+ try {
+ // Check instantStatePath exists before list status, see HUDI-5915
+ if (this.fileSystem.exists(instantStatePath)) {
+ return
Arrays.stream(this.fileSystem.listStatus(instantStatePath)).map(InstantStateDTO::fromFileStatus).collect(Collectors.toList());
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to load instant states, path: " +
instantStatePath, e);
+ }
+ }
+
+}