This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 33436aa Revert "[HUDI-2677] Add DFS based message queue for flink
writer (#3915)" (#3923)
33436aa is described below
commit 33436aa359093246f31b53530d8890a4ce07cfcb
Author: Danny Chan <[email protected]>
AuthorDate: Thu Nov 4 20:48:57 2021 +0800
Revert "[HUDI-2677] Add DFS based message queue for flink writer (#3915)"
(#3923)
This reverts commit dbf8c44bdb3019f2ce93d6b1224d9d478c0340fa.
---
.../org/apache/hudi/sink/StreamWriteFunction.java | 8 +-
.../hudi/sink/StreamWriteOperatorCoordinator.java | 65 +++++---
.../hudi/sink/bulk/BulkInsertWriteFunction.java | 40 ++---
.../sink/common/AbstractStreamWriteFunction.java | 98 ++----------
.../org/apache/hudi/sink/message/MessageBus.java | 173 ---------------------
.../apache/hudi/sink/message/MessageClient.java | 126 ---------------
.../apache/hudi/sink/message/MessageDriver.java | 132 ----------------
.../apache/hudi/sink/message/TestMessageBus.java | 137 ----------------
.../sink/utils/StreamWriteFunctionWrapper.java | 1 -
9 files changed, 79 insertions(+), 701 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 11564d1..0e7e35e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -137,7 +137,6 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
@Override
public void close() {
- super.close();
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
@@ -402,6 +401,11 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
}
}
+ private boolean hasData() {
+ return this.buckets.size() > 0
+ && this.buckets.values().stream().anyMatch(bucket ->
bucket.records.size() > 0);
+ }
+
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
@@ -435,7 +439,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean endInput) {
- this.currentInstant = instantToWrite(false);
+ this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index a30d766..feb348f 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -30,9 +30,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.message.MessageBus;
-import org.apache.hudi.sink.message.MessageDriver;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
@@ -42,6 +41,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,11 +137,6 @@ public class StreamWriteOperatorCoordinator
private transient TableState tableState;
/**
- * The message driver.
- */
- private MessageDriver messageDriver;
-
- /**
* Constructs a StreamingSinkOperatorCoordinator.
*
* @param conf The config options
@@ -179,7 +174,6 @@ public class StreamWriteOperatorCoordinator
if (tableState.syncMetadata) {
initMetadataSync();
}
- this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(),
metaClient.getBasePath());
}
@Override
@@ -197,9 +191,6 @@ public class StreamWriteOperatorCoordinator
writeClient.close();
}
this.eventBuffer = null;
- if (this.messageDriver != null) {
- this.messageDriver.close();
- }
}
@Override
@@ -236,7 +227,7 @@ public class StreamWriteOperatorCoordinator
writeClient.scheduleCompaction(Option.empty());
}
// start new instant.
- startInstant(checkpointId);
+ startInstant();
// sync Hive if is enabled
syncHiveIfEnabled();
}
@@ -246,7 +237,12 @@ public class StreamWriteOperatorCoordinator
@Override
public void notifyCheckpointAborted(long checkpointId) {
- this.messageDriver.abortCkp(checkpointId);
+ // once the checkpoint was aborted, unblock the writer tasks to
+ // reuse the last instant.
+ if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
+ executor.execute(() -> sendCommitAckEvents(checkpointId),
+ "unblock data write with aborted checkpoint %s", checkpointId);
+ }
}
@Override
@@ -337,19 +333,12 @@ public class StreamWriteOperatorCoordinator
}
private void startInstant() {
- // the flink checkpoint id starts from 1,
- // see AbstractStreamWriteFunction#ackInstant
- startInstant(MessageBus.INITIAL_CKP_ID);
- }
-
- private void startInstant(long checkpoint) {
final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
-
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
instant);
- this.writeClient.upgradeDowngrade(instant);
- this.messageDriver.commitCkp(checkpoint, this.instant, instant);
this.instant = instant;
- LOG.info("Create instant [{}] for table [{}] with type [{}]", instant,
+
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
this.instant);
+ this.writeClient.upgradeDowngrade(this.instant);
+ LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME),
conf.getString(FlinkOptions.TABLE_TYPE));
}
@@ -409,6 +398,33 @@ public class StreamWriteOperatorCoordinator
}
/**
+ * The coordinator reuses the instant if there is no data for this round of
checkpoint,
+ * sends the commit ack events to unblock the flushing.
+ */
+ private void sendCommitAckEvents(long checkpointId) {
+ CompletableFuture<?>[] futures =
Arrays.stream(this.gateways).filter(Objects::nonNull)
+ .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
+ .toArray(CompletableFuture<?>[]::new);
+ try {
+ CompletableFuture.allOf(futures).get();
+ } catch (Throwable throwable) {
+ if (!sendToFinishedTasks(throwable)) {
+ throw new HoodieException("Error while waiting for the commit ack
events to finish sending", throwable);
+ }
+ }
+ }
+
+ /**
+ * Decides whether the given exception is caused by sending events to
FINISHED tasks.
+ *
+ * <p>Ugly impl: the exception may change in the future.
+ */
+ private static boolean sendToFinishedTasks(Throwable throwable) {
+ return throwable.getCause() instanceof TaskNotRunningException
+ || throwable.getCause().getMessage().contains("running");
+ }
+
+ /**
* Commits the instant.
*/
private void commitInstant(String instant) {
@@ -435,7 +451,8 @@ public class StreamWriteOperatorCoordinator
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
- messageDriver.commitCkp(checkpointId, this.instant, this.instant);
+ // Send commit ack event to the write function to unblock the flushing
+ sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index f5fda5a..f3cfbae 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -21,13 +21,11 @@ package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.message.MessageBus;
-import org.apache.hudi.sink.message.MessageClient;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
@@ -40,8 +38,6 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -84,19 +80,24 @@ public class BulkInsertWriteFunction<I>
private int taskID;
/**
+ * Meta Client.
+ */
+ private transient HoodieTableMetaClient metaClient;
+
+ /**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
- * Gateway to send operator events to the operator coordinator.
+ * The initial inflight instant when start up.
*/
- private transient OperatorEventGateway eventGateway;
+ private volatile String initInstant;
/**
- * The message client.
+ * Gateway to send operator events to the operator coordinator.
*/
- private MessageClient messageClient;
+ private transient OperatorEventGateway eventGateway;
/**
* Constructs a StreamingSinkFunction.
@@ -111,8 +112,9 @@ public class BulkInsertWriteFunction<I>
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.metaClient = StreamerUtil.createMetaClient(this.config);
this.writeClient = StreamerUtil.createWriteClient(this.config,
getRuntimeContext());
- this.messageClient =
MessageBus.getClient(config.getString(FlinkOptions.PATH));
+ this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient,
false);
sendBootstrapEvent();
initWriterHelper();
}
@@ -128,9 +130,6 @@ public class BulkInsertWriteFunction<I>
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
- if (this.messageClient != null) {
- this.messageClient.close();
- }
}
/**
@@ -184,17 +183,8 @@ public class BulkInsertWriteFunction<I>
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].",
taskID);
}
- /**
- * Returns the next instant to write from the message bus.
- */
- @Nullable
- private String ackInstant() {
- Option<MessageBus.CkpMessage> ckpMessageOption =
this.messageClient.getCkpMessage(MessageBus.INITIAL_CKP_ID);
- return ckpMessageOption.map(message ->
message.inflightInstant).orElse(null);
- }
-
private String instantToWrite() {
- String instant = ackInstant();
+ String instant = StreamerUtil.getLastPendingInstant(this.metaClient);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout
threshold hits.
TimeWait timeWait = TimeWait.builder()
@@ -202,14 +192,14 @@ public class BulkInsertWriteFunction<I>
.action("instant initialize")
.throwsT(true)
.build();
- while (instant == null) {
+ while (instant == null || instant.equals(this.initInstant)) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
- instant = ackInstant();
+ instant = StreamerUtil.getLastPendingInstant(this.metaClient);
}
return instant;
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index c3fcec0..5ad2935 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -21,14 +21,11 @@ package org.apache.hudi.sink.common;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.message.MessageBus;
-import org.apache.hudi.sink.message.MessageClient;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
@@ -42,14 +39,12 @@ import
org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* Base infrastructures for streaming writer function.
@@ -125,11 +120,6 @@ public abstract class AbstractStreamWriteFunction<I>
private long checkpointId = -1;
/**
- * The message client.
- */
- private MessageClient messageClient;
-
- /**
* Constructs a StreamWriteFunctionBase.
*
* @param config The config options
@@ -150,6 +140,7 @@ public abstract class AbstractStreamWriteFunction<I>
TypeInformation.of(WriteMetadataEvent.class)
));
+ this.currentInstant = lastPendingInstant();
if (context.isRestored()) {
restoreWriteMetadata();
} else {
@@ -157,7 +148,6 @@ public abstract class AbstractStreamWriteFunction<I>
}
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
- this.messageClient = MessageBus.getClient(this.metaClient.getFs(),
this.metaClient.getBasePath());
}
@Override
@@ -187,19 +177,14 @@ public abstract class AbstractStreamWriteFunction<I>
// -------------------------------------------------------------------------
private void restoreWriteMetadata() throws Exception {
- List<WriteMetadataEvent> events =
CollectionUtil.iterableToList(this.writeMetadataState.get());
+ String lastInflight = lastPendingInstant();
boolean eventSent = false;
- if (events.size() > 0) {
- boolean committed = this.metaClient.getActiveTimeline()
- .filterCompletedInstants()
- .containsInstant(events.get(0).getInstantTime());
- if (!committed) {
- for (WriteMetadataEvent event : events) {
- // The checkpoint succeed but the meta does not commit,
- // re-commit the inflight instant
- this.eventGateway.sendEventToCoordinator(event);
- LOG.info("Send uncommitted write metadata event to coordinator,
task[{}].", taskID);
- }
+ for (WriteMetadataEvent event : this.writeMetadataState.get()) {
+ if (Objects.equals(lastInflight, event.getInstantTime())) {
+ // The checkpoint succeed but the meta does not commit,
+ // re-commit the inflight instant
+ this.eventGateway.sendEventToCoordinator(event);
+ LOG.info("Send uncommitted write metadata event to coordinator,
task[{}].", taskID);
eventSent = true;
}
}
@@ -237,65 +222,21 @@ public abstract class AbstractStreamWriteFunction<I>
}
}
- @Override
- public void close() {
- if (this.messageClient != null) {
- this.messageClient.close();
- }
- }
-
/**
* Returns the last pending instant time.
*/
- private String lastPendingInstant() {
- return StreamerUtil.getLastPendingInstant(metaClient);
- }
-
- /**
- * Returns the previous committed checkpoint id.
- *
- * @param eagerFlush Whether the data flush happens before the checkpoint
barrier arrives
- */
- private long prevCkp(boolean eagerFlush) {
- // Use the last checkpoint id to request for the message,
- // the time sequence of committed checkpoints and ongoing
- // checkpoints are as following:
-
- // 0 ------------ 1 ------------ 2 ------------ 3 ------------>
committed ckp id
- // | / / / /
- // |--- ckp-1 ----|--- ckp-2 ----|--- ckp-3 ----|--- ckp-4 ----| ongoing
ckp id
-
- // Use 0 as the initial committed checkpoint id, the 0th checkpoint
message records the writing instant for ckp-1;
- // when ckp-1 success event is received, commits a checkpoint message with
the writing instant for ckp-2;
- // that means, the checkpoint message records the writing instant of next
checkpoint.
- return Math.max(0, eagerFlush ? this.checkpointId : this.checkpointId - 1);
- }
-
- /**
- * Returns the next instant to write from the message bus.
- *
- * <p>It returns 3 kinds of value:
- * i) normal instant time: the previous checkpoint succeed;
- * ii) 'aborted' instant time: the previous checkpoint has been aborted;
- * ii) null: the checkpoint is till ongoing without any notifications.
- */
- @Nullable
- protected String ackInstant(long checkpointId) {
- Option<MessageBus.CkpMessage> ckpMessageOption =
this.messageClient.getCkpMessage(checkpointId);
- return ckpMessageOption.map(message ->
message.inflightInstant).orElse(null);
+ protected String lastPendingInstant() {
+ return StreamerUtil.getLastPendingInstant(this.metaClient);
}
/**
* Prepares the instant time to write with for next checkpoint.
*
- * @param eagerFlush Whether the data flush happens before the checkpoint
barrier arrives
- *
+ * @param hasData Whether the task has buffering data
* @return The instant time
*/
- protected String instantToWrite(boolean eagerFlush) {
- final long ckpId = prevCkp(eagerFlush);
- String instant = ackInstant(ckpId);
-
+ protected String instantToWrite(boolean hasData) {
+ String instant = lastPendingInstant();
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout
threshold hits.
TimeWait timeWait = TimeWait.builder()
@@ -306,23 +247,18 @@ public abstract class AbstractStreamWriteFunction<I>
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has
buffering data
- if (instant == null) {
+ if (instant == null || (instant.equals(this.currentInstant) && hasData))
{
// sleep for a while
boolean timeout = timeWait.waitFor();
- if (timeout && MessageBus.notInitialCkp(ckpId)) {
+ if (timeout && instant != null) {
// if the timeout threshold hits but the last instant still not
commit,
// and the task does not receive commit ask event(no data or aborted
checkpoint),
// assumes the checkpoint was canceled silently and unblock the data
flushing
confirming = false;
- instant = lastPendingInstant();
} else {
// refresh the inflight instant
- instant = ackInstant(ckpId);
+ instant = lastPendingInstant();
}
- } else if (MessageBus.canAbort(instant, ckpId)) {
- // the checkpoint was canceled, reuse the last instant
- confirming = false;
- instant = lastPendingInstant();
} else {
// the pending instant changed, that means the last instant was
committed
// successfully.
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java
deleted file mode 100644
index ff8f3eb..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.util.StreamerUtil;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-/**
- * A message bus for transferring the checkpoint messages.
- *
- * <p>Each time the driver starts a new instant, it writes a commit message
into the bus, the write tasks
- * then consume the message and unblocking the data flush.
- *
- * <p>Why we use the DFS based message queue instead of sending
- * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ?
- * The write task handles the operator event using the main mailbox executor
which has the lowest priority for mails,
- * it is also used to process the inputs. When the write task blocks and waits
for the operator event to ack the valid instant to write,
- * it actually blocks all the following events in the mailbox, the operator
event can never be consumed then it causes deadlock.
- *
- * <p>The message bus is also more lightweight than the active timeline.
- */
-public abstract class MessageBus implements AutoCloseable {
-
- public static final long INITIAL_CKP_ID = 0L;
-
- public static final String ABORTED_CKP_INSTANT = "aborted";
-
- protected static final int MESSAGE_QUEUE_LENGTH = 20;
-
- protected static final int CLIENT_MESSAGE_CACHE_SIZE = 10;
-
- private static final String MESSAGE_BUS = "message_bus";
-
- private static final String COMMIT = "commit";
-
- private static final String COMMIT_EXTENSION = "." + COMMIT;
- private static final String ABORTED_EXTENSION = ".aborted";
-
- protected final FileSystem fs;
- protected final String basePath;
- protected final String messageBusPath;
-
- protected MessageBus(FileSystem fs, String basePath) {
- this.fs = fs;
- this.basePath = basePath;
- this.messageBusPath = messageBusPath(basePath);
- }
-
- public static MessageDriver getDriver(FileSystem fs, String basePath) {
- return MessageDriver.getInstance(fs, basePath);
- }
-
- public static MessageClient getClient(FileSystem fs, String basePath) {
- return MessageClient.getSingleton(fs, basePath);
- }
-
- public static MessageClient getClient(String basePath) {
- FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
- return MessageClient.getSingleton(fs, basePath);
- }
-
- // -------------------------------------------------------------------------
- // Utilities
- // -------------------------------------------------------------------------
- public static boolean canAbort(String instant, long checkpointId) {
- return ABORTED_CKP_INSTANT.equals(instant) &&
MessageBus.notInitialCkp(checkpointId);
- }
-
- public static boolean notInitialCkp(long checkpointId) {
- return checkpointId != INITIAL_CKP_ID;
- }
-
- protected Path fullFilePath(String fileName) {
- return new Path(messageBusPath, fileName);
- }
-
- protected static String messageBusPath(String basePath) {
- return basePath + Path.SEPARATOR +
HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + MESSAGE_BUS;
- }
-
- protected static String getCommitFileName(long checkpointId) {
- return checkpointId + COMMIT_EXTENSION;
- }
-
- protected static String getAbortedFileName(long checkpointId) {
- return checkpointId + ABORTED_EXTENSION;
- }
-
- // -------------------------------------------------------------------------
- // Inner Class
- // -------------------------------------------------------------------------
-
- /**
- * A checkpoint message.
- */
- public static class CkpMessage {
- private static final String SEPARATOR = ",";
-
- public final boolean committed; // whether the checkpoint is committed
-
- public final long checkpointId;
- public final String commitInstant;
- public final String inflightInstant;
-
- private CkpMessage(long checkpointId, String commitInstant, String
inflightInstant) {
- this.committed = true;
- this.checkpointId = checkpointId;
- this.commitInstant = commitInstant;
- this.inflightInstant = inflightInstant;
- }
-
- private CkpMessage(long checkpointId) {
- this.committed = false;
- this.checkpointId = checkpointId;
- this.commitInstant = ABORTED_CKP_INSTANT;
- this.inflightInstant = ABORTED_CKP_INSTANT;
- }
-
- /**
- * Encodes the instants as 'commitInstant,inflightInstant'.
- */
- public static byte[] toBytes(String commitInstant, String inflightInstant)
{
- return (commitInstant + SEPARATOR +
inflightInstant).getBytes(StandardCharsets.UTF_8);
- }
-
- public static CkpMessage fromBytes(long checkpointId, byte[] bytes) {
- String content = new String(bytes, StandardCharsets.UTF_8);
- String[] splits = content.split(SEPARATOR);
- return new CkpMessage(checkpointId, splits[0], splits[1]);
- }
-
- public static CkpMessage fromPath(FileSystem fs, Path path) throws
IOException {
- final String[] splits = path.getName().split("\\.");
- ValidationUtils.checkState(splits.length == 2, "Invalid checkpoint
message file name: " + path.getName());
- final long checkpointId = Long.parseLong(splits[0]);
- final String suffix = splits[1];
- if (suffix.equals(COMMIT)) {
- try (FSDataInputStream is = fs.open(path)) {
- byte[] bytes = FileIOUtils.readAsByteArray(is);
- return CkpMessage.fromBytes(checkpointId, bytes);
- }
- } else {
- return new CkpMessage(checkpointId);
- }
- }
- }
-}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
deleted file mode 100644
index ea893d5..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A client that consumes messages from the {@link MessageBus}.
- */
-public class MessageClient extends MessageBus {
- private static final Logger LOG =
LoggerFactory.getLogger(MessageClient.class);
-
- private static final Map<String, MessageClient> CLIENTS = new HashMap<>();
-
- private final TreeMap<Long, CkpMessage> ckpCache; // checkpoint id ->
CkpMessage mapping
-
- private MessageClient(FileSystem fs, String basePath) throws IOException {
- super(fs, basePath);
- this.ckpCache = new TreeMap<>();
- }
-
- /**
- * Returns the message bus instance.
- *
- * <p>This expects to be called by the client.
- *
- * @param fs The filesystem
- * @param basePath The table base path
- * @return The instance of message bus
- */
- private static MessageClient getInstance(FileSystem fs, String basePath) {
- try {
- return new MessageClient(fs, basePath);
- } catch (IOException e) {
- throw new HoodieException("Initialize checkpoint message bus error", e);
- }
- }
-
- /**
- * Returns the singleton message bus instance.
- *
- * <p>This expects to be called by the client.
- *
- * @param fs The filesystem
- * @param basePath The table base path
- * @return The instance of message bus
- */
- public static synchronized MessageClient getSingleton(FileSystem fs, String
basePath) {
- return CLIENTS.computeIfAbsent(basePath,
- k -> getInstance(fs, basePath));
- }
-
- public synchronized Option<CkpMessage> getCkpMessage(long checkpointId) {
- if (this.ckpCache.size() >= CLIENT_MESSAGE_CACHE_SIZE) {
- this.ckpCache.pollFirstEntry();
- }
- if (this.ckpCache.containsKey(checkpointId)) {
- return Option.of(this.ckpCache.get(checkpointId));
- }
- final Path commitFilePath = fullFilePath(getCommitFileName(checkpointId));
- try {
- if (fs.exists(commitFilePath)) {
- CkpMessage ckpMessage = CkpMessage.fromPath(fs, commitFilePath);
- this.ckpCache.put(checkpointId, ckpMessage);
- return Option.of(ckpMessage);
- }
- } catch (Throwable e) {
- // ignored
- LOG.warn("Read committed checkpoint message error: " + checkpointId, e);
- return Option.empty();
- }
- final Path abortedFilePath =
fullFilePath(getAbortedFileName(checkpointId));
- try {
- if (fs.exists(abortedFilePath)) {
- CkpMessage ckpMessage = CkpMessage.fromPath(fs, abortedFilePath);
- this.ckpCache.put(checkpointId, ckpMessage);
- return Option.of(ckpMessage);
- }
- } catch (Throwable e) {
- // ignored
- LOG.warn("Read aborted checkpoint message error: " + checkpointId, e);
- return Option.empty();
- }
- return Option.empty();
- }
-
- @VisibleForTesting
- public TreeMap<Long, CkpMessage> getCkpCache() {
- return ckpCache;
- }
-
- @Override
- public void close() {
- synchronized (this) {
- this.ckpCache.clear();
- }
- }
-}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java
deleted file mode 100644
index bf98209..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.exception.HoodieException;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A driver that produces messages to the {@link MessageBus}.
- */
-public class MessageDriver extends MessageBus {
- private final TreeMap<Long, Boolean> ckpIdCache; // checkpoint id ->
isCommitted mapping
-
- public MessageDriver(FileSystem fs, String basePath) throws IOException {
- super(fs, basePath);
- this.ckpIdCache = new TreeMap<>();
- initialize();
- }
-
- /**
- * Returns the message bus instance.
- *
- * <p>This expects to be called by the driver.
- *
- * @param fs The filesystem
- * @param basePath The table base path
- * @return The instance of message bus
- */
- public static MessageDriver getInstance(FileSystem fs, String basePath) {
- try {
- return new MessageDriver(fs, basePath);
- } catch (IOException e) {
- throw new HoodieException("Initialize checkpoint message bus error", e);
- }
- }
-
- /**
- * Initialize the message bus, would clean all the messages.
- *
- * <p>This expects to be called by the driver.
- */
- private void initialize() throws IOException {
- Path path = new Path(messageBusPath(basePath));
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- fs.mkdirs(path);
- }
-
- /**
- * Add a checkpoint commit message.
- *
- * @param checkpointId The checkpoint id
- * @param commitInstant The committed instant
- * @param inflightInstant The new inflight instant
- */
- public void commitCkp(long checkpointId, String commitInstant, String
inflightInstant) {
- Path path = fullFilePath(getCommitFileName(checkpointId));
-
- try (FSDataOutputStream outputStream = fs.create(path, true)) {
- byte[] bytes = CkpMessage.toBytes(commitInstant, inflightInstant);
- outputStream.write(bytes);
- outputStream.close();
- this.ckpIdCache.put(checkpointId, true);
- clean();
- } catch (Throwable e) {
- throw new HoodieException("Adding committed message error for
checkpoint: " + checkpointId, e);
- }
- }
-
- /**
- * Add an aborted checkpoint message.
- *
- * @param checkpointId The checkpoint id
- */
- public void abortCkp(long checkpointId) {
- Path path = fullFilePath(getAbortedFileName(checkpointId));
- try {
- fs.createNewFile(path);
- this.ckpIdCache.put(checkpointId, false);
- clean();
- } catch (Throwable e) {
- throw new HoodieException("Adding aborted message error for checkpoint:
" + checkpointId, e);
- }
- }
-
- private void clean() throws IOException {
- int numToClean = this.ckpIdCache.size() - MESSAGE_QUEUE_LENGTH;
- if (numToClean >= 10) {
- for (int i = 0; i < numToClean; i++) {
- Map.Entry<Long, Boolean> entry = this.ckpIdCache.pollFirstEntry();
- final String fileName = entry.getValue() ?
getCommitFileName(entry.getKey()) : getAbortedFileName(entry.getKey());
- final Path filePath = fullFilePath(fileName);
- fs.delete(filePath, false);
- }
- }
- }
-
- @VisibleForTesting
- public TreeMap<Long, Boolean> getCkpIdCache() {
- return ckpIdCache;
- }
-
- @Override
- public void close() throws Exception {
- this.ckpIdCache.clear();
- }
-}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java
deleted file mode 100644
index b161c96..0000000
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.utils.TestConfigurations;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test cases for {@link MessageBus}.
- */
-public class TestMessageBus {
-
- private String basePath;
- private FileSystem fs;
-
- private MessageDriver driver;
-
- @TempDir
- File tempFile;
-
- @BeforeEach
- public void beforeEach() throws Exception {
- basePath = tempFile.getAbsolutePath();
- this.fs = FSUtils.getFs(tempFile.getAbsolutePath(),
StreamerUtil.getHadoopConf());
-
- Configuration conf = TestConfigurations.getDefaultConf(basePath);
- StreamerUtil.initTableIfNotExists(conf);
-
- this.driver = MessageDriver.getInstance(fs, basePath);
- }
-
- @Test
- void testWriteAndReadMessage() {
- MessageClient client = MessageClient.getSingleton(fs, basePath);
-
- // write and read 5 committed checkpoints
- IntStream.range(0, 5).forEach(i -> driver.commitCkp(i, i + "", i + 1 +
""));
-
- IntStream.range(0, 5).forEach(i -> {
- Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
- assertTrue(messageOpt.isPresent());
-
- MessageBus.CkpMessage ckpMessage = messageOpt.get();
- assertTrue(ckpMessage.committed);
- assertThat(ckpMessage.commitInstant, is(i + ""));
- assertThat(ckpMessage.inflightInstant, is(i + 1 + ""));
- });
-
- // write and read 5 aborted checkpoints
- IntStream.range(5, 10).forEach(i -> driver.abortCkp(i));
-
- IntStream.range(5, 10).forEach(i -> {
- Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
- assertTrue(messageOpt.isPresent());
-
- MessageBus.CkpMessage ckpMessage = messageOpt.get();
- assertFalse(ckpMessage.committed);
- assertThat(ckpMessage.commitInstant, is(MessageBus.ABORTED_CKP_INSTANT));
- assertThat(ckpMessage.inflightInstant,
is(MessageBus.ABORTED_CKP_INSTANT));
- });
- }
-
- @Test
- void testWriteCleaning() {
- // write and read 20 committed checkpoints
- IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 +
""));
- assertThat("The id cache should not be cleaned",
driver.getCkpIdCache().size(), is(20));
-
- // write and read 10 aborted checkpoints
- IntStream.range(20, 29).forEach(i -> driver.abortCkp(i));
- assertThat("The id cache should not be cleaned",
driver.getCkpIdCache().size(), is(29));
-
- driver.commitCkp(29, "29", "30");
- assertThat("The cache should be cleaned", driver.getCkpIdCache().size(),
is(20));
- assertThat(longSet2String(driver.getCkpIdCache().keySet()),
- is("10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29"));
- }
-
- @Test
- void testReadCleaning() {
- MessageClient client = MessageClient.getSingleton(fs, basePath);
-
- // write and read 20 committed checkpoints
- IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 +
""));
-
- IntStream.range(0, 10).forEach(client::getCkpMessage);
- assertThat("The checkpoint cache should not be cleaned",
client.getCkpCache().size(), is(10));
-
- client.getCkpMessage(10);
- assertThat("The checkpoint cache should be cleaned",
client.getCkpCache().size(), is(10));
-
- IntStream.range(11, 15).forEach(client::getCkpMessage);
- assertThat("The checkpoint cache should be cleaned",
client.getCkpCache().size(), is(10));
- assertThat(longSet2String(client.getCkpCache().keySet()),
is("5,6,7,8,9,10,11,12,13,14"));
- }
-
- private static String longSet2String(Set<Long> longSet) {
- List<String> elements = new ArrayList<>();
- longSet.stream().mapToInt(Long::intValue).forEach(i -> elements.add(i +
""));
- return String.join(",", elements);
- }
-}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index f1f5a1f..54a142a 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -248,7 +248,6 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
public void close() throws Exception {
coordinator.close();
ioManager.close();
- writeFunction.close();
}
public StreamWriteOperatorCoordinator getCoordinator() {