This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dbf8c44  [HUDI-2677] Add DFS based message queue for flink writer 
(#3915)
dbf8c44 is described below

commit dbf8c44bdb3019f2ce93d6b1224d9d478c0340fa
Author: Danny Chan <[email protected]>
AuthorDate: Thu Nov 4 18:09:00 2021 +0800

    [HUDI-2677] Add DFS based message queue for flink writer (#3915)
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  |   8 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  65 +++-----
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |  40 +++--
 .../sink/common/AbstractStreamWriteFunction.java   |  98 ++++++++++--
 .../org/apache/hudi/sink/message/MessageBus.java   | 173 +++++++++++++++++++++
 .../apache/hudi/sink/message/MessageClient.java    | 126 +++++++++++++++
 .../apache/hudi/sink/message/MessageDriver.java    | 132 ++++++++++++++++
 .../apache/hudi/sink/message/TestMessageBus.java   | 137 ++++++++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |   1 +
 9 files changed, 701 insertions(+), 79 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 0e7e35e..11564d1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -137,6 +137,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
   @Override
   public void close() {
+    super.close();
     if (this.writeClient != null) {
       this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
@@ -401,11 +402,6 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     }
   }
 
-  private boolean hasData() {
-    return this.buckets.size() > 0
-        && this.buckets.values().stream().anyMatch(bucket -> 
bucket.records.size() > 0);
-  }
-
   @SuppressWarnings("unchecked, rawtypes")
   private boolean flushBucket(DataBucket bucket) {
     String instant = instantToWrite(true);
@@ -439,7 +435,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
   @SuppressWarnings("unchecked, rawtypes")
   private void flushRemaining(boolean endInput) {
-    this.currentInstant = instantToWrite(hasData());
+    this.currentInstant = instantToWrite(false);
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
       throw new HoodieException("No inflight instant when flushing data!");
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index feb348f..a30d766 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -30,8 +30,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.message.MessageBus;
+import org.apache.hudi.sink.message.MessageDriver;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.StreamerUtil;
@@ -41,7 +42,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,6 +137,11 @@ public class StreamWriteOperatorCoordinator
   private transient TableState tableState;
 
   /**
+   * The message driver.
+   */
+  private MessageDriver messageDriver;
+
+  /**
    * Constructs a StreamingSinkOperatorCoordinator.
    *
    * @param conf    The config options
@@ -174,6 +179,7 @@ public class StreamWriteOperatorCoordinator
     if (tableState.syncMetadata) {
       initMetadataSync();
     }
+    this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(), 
metaClient.getBasePath());
   }
 
   @Override
@@ -191,6 +197,9 @@ public class StreamWriteOperatorCoordinator
       writeClient.close();
     }
     this.eventBuffer = null;
+    if (this.messageDriver != null) {
+      this.messageDriver.close();
+    }
   }
 
   @Override
@@ -227,7 +236,7 @@ public class StreamWriteOperatorCoordinator
               writeClient.scheduleCompaction(Option.empty());
             }
             // start new instant.
-            startInstant();
+            startInstant(checkpointId);
             // sync Hive if is enabled
             syncHiveIfEnabled();
           }
@@ -237,12 +246,7 @@ public class StreamWriteOperatorCoordinator
 
   @Override
   public void notifyCheckpointAborted(long checkpointId) {
-    // once the checkpoint was aborted, unblock the writer tasks to
-    // reuse the last instant.
-    if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
-      executor.execute(() -> sendCommitAckEvents(checkpointId),
-          "unblock data write with aborted checkpoint %s", checkpointId);
-    }
+    this.messageDriver.abortCkp(checkpointId);
   }
 
   @Override
@@ -333,12 +337,19 @@ public class StreamWriteOperatorCoordinator
   }
 
   private void startInstant() {
+    // the flink checkpoint id starts from 1,
+    // see AbstractStreamWriteFunction#ackInstant
+    startInstant(MessageBus.INITIAL_CKP_ID);
+  }
+
+  private void startInstant(long checkpoint) {
     final String instant = HoodieActiveTimeline.createNewInstantTime();
     this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+    
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
 instant);
+    this.writeClient.upgradeDowngrade(instant);
+    this.messageDriver.commitCkp(checkpoint, this.instant, instant);
     this.instant = instant;
-    
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
 this.instant);
-    this.writeClient.upgradeDowngrade(this.instant);
-    LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
+    LOG.info("Create instant [{}] for table [{}] with type [{}]", instant,
         this.conf.getString(FlinkOptions.TABLE_NAME), 
conf.getString(FlinkOptions.TABLE_TYPE));
   }
 
@@ -398,33 +409,6 @@ public class StreamWriteOperatorCoordinator
   }
 
   /**
-   * The coordinator reuses the instant if there is no data for this round of 
checkpoint,
-   * sends the commit ack events to unblock the flushing.
-   */
-  private void sendCommitAckEvents(long checkpointId) {
-    CompletableFuture<?>[] futures = 
Arrays.stream(this.gateways).filter(Objects::nonNull)
-        .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
-        .toArray(CompletableFuture<?>[]::new);
-    try {
-      CompletableFuture.allOf(futures).get();
-    } catch (Throwable throwable) {
-      if (!sendToFinishedTasks(throwable)) {
-        throw new HoodieException("Error while waiting for the commit ack 
events to finish sending", throwable);
-      }
-    }
-  }
-
-  /**
-   * Decides whether the given exception is caused by sending events to 
FINISHED tasks.
-   *
-   * <p>Ugly impl: the exception may change in the future.
-   */
-  private static boolean sendToFinishedTasks(Throwable throwable) {
-    return throwable.getCause() instanceof TaskNotRunningException
-        || throwable.getCause().getMessage().contains("running");
-  }
-
-  /**
    * Commits the instant.
    */
   private void commitInstant(String instant) {
@@ -451,8 +435,7 @@ public class StreamWriteOperatorCoordinator
     if (writeResults.size() == 0) {
       // No data has written, reset the buffer and returns early
       reset();
-      // Send commit ack event to the write function to unblock the flushing
-      sendCommitAckEvents(checkpointId);
+      messageDriver.commitCkp(checkpointId, this.instant, this.instant);
       return false;
     }
     doCommit(instant, writeResults);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index f3cfbae..f5fda5a 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -21,11 +21,13 @@ package org.apache.hudi.sink.bulk;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.message.MessageBus;
+import org.apache.hudi.sink.message.MessageClient;
 import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -38,6 +40,8 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -80,24 +84,19 @@ public class BulkInsertWriteFunction<I>
   private int taskID;
 
   /**
-   * Meta Client.
-   */
-  private transient HoodieTableMetaClient metaClient;
-
-  /**
    * Write Client.
    */
   private transient HoodieFlinkWriteClient writeClient;
 
   /**
-   * The initial inflight instant when start up.
+   * Gateway to send operator events to the operator coordinator.
    */
-  private volatile String initInstant;
+  private transient OperatorEventGateway eventGateway;
 
   /**
-   * Gateway to send operator events to the operator coordinator.
+   * The message client.
    */
-  private transient OperatorEventGateway eventGateway;
+  private MessageClient messageClient;
 
   /**
    * Constructs a StreamingSinkFunction.
@@ -112,9 +111,8 @@ public class BulkInsertWriteFunction<I>
   @Override
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
-    this.metaClient = StreamerUtil.createMetaClient(this.config);
     this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
-    this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, 
false);
+    this.messageClient = 
MessageBus.getClient(config.getString(FlinkOptions.PATH));
     sendBootstrapEvent();
     initWriterHelper();
   }
@@ -130,6 +128,9 @@ public class BulkInsertWriteFunction<I>
       this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
     }
+    if (this.messageClient != null) {
+      this.messageClient.close();
+    }
   }
 
   /**
@@ -183,8 +184,17 @@ public class BulkInsertWriteFunction<I>
     LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", 
taskID);
   }
 
+  /**
+   * Returns the next instant to write from the message bus.
+   */
+  @Nullable
+  private String ackInstant() {
+    Option<MessageBus.CkpMessage> ckpMessageOption = 
this.messageClient.getCkpMessage(MessageBus.INITIAL_CKP_ID);
+    return ckpMessageOption.map(message -> 
message.inflightInstant).orElse(null);
+  }
+
   private String instantToWrite() {
-    String instant = StreamerUtil.getLastPendingInstant(this.metaClient);
+    String instant = ackInstant();
     // if exactly-once semantics turns on,
     // waits for the checkpoint notification until the checkpoint timeout 
threshold hits.
     TimeWait timeWait = TimeWait.builder()
@@ -192,14 +202,14 @@ public class BulkInsertWriteFunction<I>
         .action("instant initialize")
         .throwsT(true)
         .build();
-    while (instant == null || instant.equals(this.initInstant)) {
+    while (instant == null) {
       // wait condition:
       // 1. there is no inflight instant
       // 2. the inflight instant does not change
       // sleep for a while
       timeWait.waitFor();
       // refresh the inflight instant
-      instant = StreamerUtil.getLastPendingInstant(this.metaClient);
+      instant = ackInstant();
     }
     return instant;
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 5ad2935..c3fcec0 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -21,11 +21,14 @@ package org.apache.hudi.sink.common;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.message.MessageBus;
+import org.apache.hudi.sink.message.MessageClient;
 import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -39,12 +42,14 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.CollectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 
 /**
  * Base infrastructures for streaming writer function.
@@ -120,6 +125,11 @@ public abstract class AbstractStreamWriteFunction<I>
   private long checkpointId = -1;
 
   /**
+   * The message client.
+   */
+  private MessageClient messageClient;
+
+  /**
    * Constructs a StreamWriteFunctionBase.
    *
    * @param config The config options
@@ -140,7 +150,6 @@ public abstract class AbstractStreamWriteFunction<I>
             TypeInformation.of(WriteMetadataEvent.class)
         ));
 
-    this.currentInstant = lastPendingInstant();
     if (context.isRestored()) {
       restoreWriteMetadata();
     } else {
@@ -148,6 +157,7 @@ public abstract class AbstractStreamWriteFunction<I>
     }
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
+    this.messageClient = MessageBus.getClient(this.metaClient.getFs(), 
this.metaClient.getBasePath());
   }
 
   @Override
@@ -177,14 +187,19 @@ public abstract class AbstractStreamWriteFunction<I>
   // -------------------------------------------------------------------------
 
   private void restoreWriteMetadata() throws Exception {
-    String lastInflight = lastPendingInstant();
+    List<WriteMetadataEvent> events = 
CollectionUtil.iterableToList(this.writeMetadataState.get());
     boolean eventSent = false;
-    for (WriteMetadataEvent event : this.writeMetadataState.get()) {
-      if (Objects.equals(lastInflight, event.getInstantTime())) {
-        // The checkpoint succeed but the meta does not commit,
-        // re-commit the inflight instant
-        this.eventGateway.sendEventToCoordinator(event);
-        LOG.info("Send uncommitted write metadata event to coordinator, 
task[{}].", taskID);
+    if (events.size() > 0) {
+      boolean committed = this.metaClient.getActiveTimeline()
+          .filterCompletedInstants()
+          .containsInstant(events.get(0).getInstantTime());
+      if (!committed) {
+        for (WriteMetadataEvent event : events) {
+          // The checkpoint succeed but the meta does not commit,
+          // re-commit the inflight instant
+          this.eventGateway.sendEventToCoordinator(event);
+          LOG.info("Send uncommitted write metadata event to coordinator, 
task[{}].", taskID);
+        }
         eventSent = true;
       }
     }
@@ -222,21 +237,65 @@ public abstract class AbstractStreamWriteFunction<I>
     }
   }
 
+  @Override
+  public void close() {
+    if (this.messageClient != null) {
+      this.messageClient.close();
+    }
+  }
+
   /**
    * Returns the last pending instant time.
    */
-  protected String lastPendingInstant() {
-    return StreamerUtil.getLastPendingInstant(this.metaClient);
+  private String lastPendingInstant() {
+    return StreamerUtil.getLastPendingInstant(metaClient);
+  }
+
+  /**
+   * Returns the previous committed checkpoint id.
+   *
+   * @param eagerFlush Whether the data flush happens before the checkpoint 
barrier arrives
+   */
+  private long prevCkp(boolean eagerFlush) {
+    // Use the last checkpoint id to request for the message,
+    // the time sequence of committed checkpoints and ongoing
+    // checkpoints are as following:
+
+    // 0 ------------ 1 ------------ 2 ------------ 3 ------------>   
committed ckp id
+    // |             /              /              /              /
+    // |--- ckp-1 ----|--- ckp-2 ----|--- ckp-3 ----|--- ckp-4 ----|  ongoing 
ckp id
+
+    // Use 0 as the initial committed checkpoint id, the 0th checkpoint 
message records the writing instant for ckp-1;
+    // when ckp-1 success event is received, commits a checkpoint message with 
the writing instant for ckp-2;
+    // that means, the checkpoint message records the writing instant of next 
checkpoint.
+    return Math.max(0, eagerFlush ? this.checkpointId : this.checkpointId - 1);
+  }
+
+  /**
+   * Returns the next instant to write from the message bus.
+   *
+   * <p>It returns 3 kinds of value:
+   * i) normal instant time: the previous checkpoint succeed;
+   * ii) 'aborted' instant time: the previous checkpoint has been aborted;
+   * ii) null: the checkpoint is till ongoing without any notifications.
+   */
+  @Nullable
+  protected String ackInstant(long checkpointId) {
+    Option<MessageBus.CkpMessage> ckpMessageOption = 
this.messageClient.getCkpMessage(checkpointId);
+    return ckpMessageOption.map(message -> 
message.inflightInstant).orElse(null);
   }
 
   /**
    * Prepares the instant time to write with for next checkpoint.
    *
-   * @param hasData Whether the task has buffering data
+   * @param eagerFlush Whether the data flush happens before the checkpoint 
barrier arrives
+   *
    * @return The instant time
    */
-  protected String instantToWrite(boolean hasData) {
-    String instant = lastPendingInstant();
+  protected String instantToWrite(boolean eagerFlush) {
+    final long ckpId = prevCkp(eagerFlush);
+    String instant = ackInstant(ckpId);
+
     // if exactly-once semantics turns on,
     // waits for the checkpoint notification until the checkpoint timeout 
threshold hits.
     TimeWait timeWait = TimeWait.builder()
@@ -247,18 +306,23 @@ public abstract class AbstractStreamWriteFunction<I>
       // wait condition:
       // 1. there is no inflight instant
       // 2. the inflight instant does not change and the checkpoint has 
buffering data
-      if (instant == null || (instant.equals(this.currentInstant) && hasData)) 
{
+      if (instant == null) {
         // sleep for a while
         boolean timeout = timeWait.waitFor();
-        if (timeout && instant != null) {
+        if (timeout && MessageBus.notInitialCkp(ckpId)) {
           // if the timeout threshold hits but the last instant still not 
commit,
           // and the task does not receive commit ask event(no data or aborted 
checkpoint),
           // assumes the checkpoint was canceled silently and unblock the data 
flushing
           confirming = false;
+          instant = lastPendingInstant();
         } else {
           // refresh the inflight instant
-          instant = lastPendingInstant();
+          instant = ackInstant(ckpId);
         }
+      } else if (MessageBus.canAbort(instant, ckpId)) {
+        // the checkpoint was canceled, reuse the last instant
+        confirming = false;
+        instant = lastPendingInstant();
       } else {
         // the pending instant changed, that means the last instant was 
committed
         // successfully.
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java
new file mode 100644
index 0000000..ff8f3eb
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.message;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A message bus for transferring the checkpoint messages.
+ *
+ * <p>Each time the driver starts a new instant, it writes a commit message 
into the bus, the write tasks
+ * then consume the message and unblocking the data flush.
+ *
+ * <p>Why we use the DFS based message queue instead of sending
+ * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ?
+ * The write task handles the operator event using the main mailbox executor 
which has the lowest priority for mails,
+ * it is also used to process the inputs. When the write task blocks and waits 
for the operator event to ack the valid instant to write,
+ * it actually blocks all the following events in the mailbox, the operator 
event can never be consumed then it causes deadlock.
+ *
+ * <p>The message bus is also more lightweight than the active timeline.
+ */
+public abstract class MessageBus implements AutoCloseable {
+
+  public static final long INITIAL_CKP_ID = 0L;
+
+  public static final String ABORTED_CKP_INSTANT = "aborted";
+
+  protected static final int MESSAGE_QUEUE_LENGTH = 20;
+
+  protected static final int CLIENT_MESSAGE_CACHE_SIZE = 10;
+
+  private static final String MESSAGE_BUS = "message_bus";
+
+  private static final String COMMIT = "commit";
+
+  private static final String COMMIT_EXTENSION = "." + COMMIT;
+  private static final String ABORTED_EXTENSION = ".aborted";
+
+  protected final FileSystem fs;
+  protected final String basePath;
+  protected final String messageBusPath;
+
+  protected MessageBus(FileSystem fs, String basePath) {
+    this.fs = fs;
+    this.basePath = basePath;
+    this.messageBusPath = messageBusPath(basePath);
+  }
+
+  public static MessageDriver getDriver(FileSystem fs, String basePath) {
+    return MessageDriver.getInstance(fs, basePath);
+  }
+
+  public static MessageClient getClient(FileSystem fs, String basePath) {
+    return MessageClient.getSingleton(fs, basePath);
+  }
+
+  public static MessageClient getClient(String basePath) {
+    FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
+    return MessageClient.getSingleton(fs, basePath);
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  public static boolean canAbort(String instant, long checkpointId) {
+    return ABORTED_CKP_INSTANT.equals(instant) && 
MessageBus.notInitialCkp(checkpointId);
+  }
+
+  public static boolean notInitialCkp(long checkpointId) {
+    return checkpointId != INITIAL_CKP_ID;
+  }
+
+  protected Path fullFilePath(String fileName) {
+    return new Path(messageBusPath, fileName);
+  }
+
+  protected static String messageBusPath(String basePath) {
+    return basePath + Path.SEPARATOR + 
HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + MESSAGE_BUS;
+  }
+
+  protected static String getCommitFileName(long checkpointId) {
+    return checkpointId + COMMIT_EXTENSION;
+  }
+
+  protected static String getAbortedFileName(long checkpointId) {
+    return checkpointId + ABORTED_EXTENSION;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * A checkpoint message.
+   */
+  public static class CkpMessage {
+    private static final String SEPARATOR = ",";
+
+    public final boolean committed; // whether the checkpoint is committed
+
+    public final long checkpointId;
+    public final String commitInstant;
+    public final String inflightInstant;
+
+    private CkpMessage(long checkpointId, String commitInstant, String 
inflightInstant) {
+      this.committed = true;
+      this.checkpointId = checkpointId;
+      this.commitInstant = commitInstant;
+      this.inflightInstant = inflightInstant;
+    }
+
+    private CkpMessage(long checkpointId) {
+      this.committed = false;
+      this.checkpointId = checkpointId;
+      this.commitInstant = ABORTED_CKP_INSTANT;
+      this.inflightInstant = ABORTED_CKP_INSTANT;
+    }
+
+    /**
+     * Encodes the instants as 'commitInstant,inflightInstant'.
+     */
+    public static byte[] toBytes(String commitInstant, String inflightInstant) 
{
+      return (commitInstant + SEPARATOR + 
inflightInstant).getBytes(StandardCharsets.UTF_8);
+    }
+
+    public static CkpMessage fromBytes(long checkpointId, byte[] bytes) {
+      String content = new String(bytes, StandardCharsets.UTF_8);
+      String[] splits = content.split(SEPARATOR);
+      return new CkpMessage(checkpointId, splits[0], splits[1]);
+    }
+
+    public static CkpMessage fromPath(FileSystem fs, Path path) throws 
IOException {
+      final String[] splits = path.getName().split("\\.");
+      ValidationUtils.checkState(splits.length == 2, "Invalid checkpoint 
message file name: " + path.getName());
+      final long checkpointId = Long.parseLong(splits[0]);
+      final String suffix = splits[1];
+      if (suffix.equals(COMMIT)) {
+        try (FSDataInputStream is = fs.open(path)) {
+          byte[] bytes = FileIOUtils.readAsByteArray(is);
+          return CkpMessage.fromBytes(checkpointId, bytes);
+        }
+      } else {
+        return new CkpMessage(checkpointId);
+      }
+    }
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
new file mode 100644
index 0000000..ea893d5
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.message;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A client that consumes messages from the {@link MessageBus}.
+ */
+public class MessageClient extends MessageBus {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MessageClient.class);
+
+  private static final Map<String, MessageClient> CLIENTS = new HashMap<>();
+
+  private final TreeMap<Long, CkpMessage> ckpCache; // checkpoint id -> 
CkpMessage mapping
+
+  private MessageClient(FileSystem fs, String basePath) throws IOException {
+    super(fs, basePath);
+    this.ckpCache = new TreeMap<>();
+  }
+
+  /**
+   * Returns the message bus instance.
+   *
+   * <p>This expects to be called by the client.
+   *
+   * @param fs       The filesystem
+   * @param basePath The table base path
+   * @return The instance of message bus
+   */
+  private static MessageClient getInstance(FileSystem fs, String basePath) {
+    try {
+      return new MessageClient(fs, basePath);
+    } catch (IOException e) {
+      throw new HoodieException("Initialize checkpoint message bus error", e);
+    }
+  }
+
+  /**
+   * Returns the singleton message bus instance.
+   *
+   * <p>This expects to be called by the client.
+   *
+   * @param fs       The filesystem
+   * @param basePath The table base path
+   * @return The instance of message bus
+   */
+  public static synchronized MessageClient getSingleton(FileSystem fs, String 
basePath) {
+    return CLIENTS.computeIfAbsent(basePath,
+        k -> getInstance(fs, basePath));
+  }
+
+  public synchronized Option<CkpMessage> getCkpMessage(long checkpointId) {
+    if (this.ckpCache.size() >= CLIENT_MESSAGE_CACHE_SIZE) {
+      this.ckpCache.pollFirstEntry();
+    }
+    if (this.ckpCache.containsKey(checkpointId)) {
+      return Option.of(this.ckpCache.get(checkpointId));
+    }
+    final Path commitFilePath = fullFilePath(getCommitFileName(checkpointId));
+    try {
+      if (fs.exists(commitFilePath)) {
+        CkpMessage ckpMessage = CkpMessage.fromPath(fs, commitFilePath);
+        this.ckpCache.put(checkpointId, ckpMessage);
+        return Option.of(ckpMessage);
+      }
+    } catch (Throwable e) {
+      // ignored
+      LOG.warn("Read committed checkpoint message error: " + checkpointId, e);
+      return Option.empty();
+    }
+    final Path abortedFilePath = 
fullFilePath(getAbortedFileName(checkpointId));
+    try {
+      if (fs.exists(abortedFilePath)) {
+        CkpMessage ckpMessage = CkpMessage.fromPath(fs, abortedFilePath);
+        this.ckpCache.put(checkpointId, ckpMessage);
+        return Option.of(ckpMessage);
+      }
+    } catch (Throwable e) {
+      // ignored
+      LOG.warn("Read aborted checkpoint message error: " + checkpointId, e);
+      return Option.empty();
+    }
+    return Option.empty();
+  }
+
+  @VisibleForTesting
+  public TreeMap<Long, CkpMessage> getCkpCache() {
+    return ckpCache;
+  }
+
+  @Override
+  public void close() {
+    synchronized (this) {
+      this.ckpCache.clear();
+    }
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java
new file mode 100644
index 0000000..bf98209
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.message;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A driver that produces messages to the {@link MessageBus}.
+ */
+public class MessageDriver extends MessageBus {
+  private final TreeMap<Long, Boolean> ckpIdCache; // checkpoint id -> 
isCommitted mapping
+
+  public MessageDriver(FileSystem fs, String basePath) throws IOException {
+    super(fs, basePath);
+    this.ckpIdCache = new TreeMap<>();
+    initialize();
+  }
+
+  /**
+   * Returns the message bus instance.
+   *
+   * <p>This expects to be called by the driver.
+   *
+   * @param fs       The filesystem
+   * @param basePath The table base path
+   * @return The instance of message bus
+   */
+  public static MessageDriver getInstance(FileSystem fs, String basePath) {
+    try {
+      return new MessageDriver(fs, basePath);
+    } catch (IOException e) {
+      throw new HoodieException("Initialize checkpoint message bus error", e);
+    }
+  }
+
+  /**
+   * Initialize the message bus, would clean all the messages.
+   *
+   * <p>This expects to be called by the driver.
+   */
+  private void initialize() throws IOException {
+    Path path = new Path(messageBusPath(basePath));
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+    fs.mkdirs(path);
+  }
+
+  /**
+   * Add a checkpoint commit message.
+   *
+   * @param checkpointId    The checkpoint id
+   * @param commitInstant   The committed instant
+   * @param inflightInstant The new inflight instant
+   */
+  public void commitCkp(long checkpointId, String commitInstant, String 
inflightInstant) {
+    Path path = fullFilePath(getCommitFileName(checkpointId));
+
+    try (FSDataOutputStream outputStream = fs.create(path, true)) {
+      byte[] bytes = CkpMessage.toBytes(commitInstant, inflightInstant);
+      outputStream.write(bytes);
+      outputStream.close();
+      this.ckpIdCache.put(checkpointId, true);
+      clean();
+    } catch (Throwable e) {
+      throw new HoodieException("Adding committed message error for 
checkpoint: " + checkpointId, e);
+    }
+  }
+
+  /**
+   * Add an aborted checkpoint message.
+   *
+   * @param checkpointId    The checkpoint id
+   */
+  public void abortCkp(long checkpointId) {
+    Path path = fullFilePath(getAbortedFileName(checkpointId));
+    try {
+      fs.createNewFile(path);
+      this.ckpIdCache.put(checkpointId, false);
+      clean();
+    } catch (Throwable e) {
+      throw new HoodieException("Adding aborted message error for checkpoint: 
" + checkpointId, e);
+    }
+  }
+
+  private void clean() throws IOException {
+    int numToClean = this.ckpIdCache.size() - MESSAGE_QUEUE_LENGTH;
+    if (numToClean >= 10) {
+      for (int i = 0; i < numToClean; i++) {
+        Map.Entry<Long, Boolean> entry = this.ckpIdCache.pollFirstEntry();
+        final String fileName = entry.getValue() ? 
getCommitFileName(entry.getKey()) : getAbortedFileName(entry.getKey());
+        final Path filePath = fullFilePath(fileName);
+        fs.delete(filePath, false);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public TreeMap<Long, Boolean> getCkpIdCache() {
+    return ckpIdCache;
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.ckpIdCache.clear();
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java
new file mode 100644
index 0000000..b161c96
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.message;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link MessageBus}.
+ */
+public class TestMessageBus {
+
+  private String basePath;
+  private FileSystem fs;
+
+  private MessageDriver driver;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    basePath = tempFile.getAbsolutePath();
+    this.fs = FSUtils.getFs(tempFile.getAbsolutePath(), 
StreamerUtil.getHadoopConf());
+
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    StreamerUtil.initTableIfNotExists(conf);
+
+    this.driver = MessageDriver.getInstance(fs, basePath);
+  }
+
+  @Test
+  void testWriteAndReadMessage() {
+    MessageClient client = MessageClient.getSingleton(fs, basePath);
+
+    // write and read 5 committed checkpoints
+    IntStream.range(0, 5).forEach(i -> driver.commitCkp(i, i + "", i + 1 + 
""));
+
+    IntStream.range(0, 5).forEach(i -> {
+      Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
+      assertTrue(messageOpt.isPresent());
+
+      MessageBus.CkpMessage ckpMessage = messageOpt.get();
+      assertTrue(ckpMessage.committed);
+      assertThat(ckpMessage.commitInstant, is(i + ""));
+      assertThat(ckpMessage.inflightInstant, is(i + 1 + ""));
+    });
+
+    // write and read 5 aborted checkpoints
+    IntStream.range(5, 10).forEach(i -> driver.abortCkp(i));
+
+    IntStream.range(5, 10).forEach(i -> {
+      Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
+      assertTrue(messageOpt.isPresent());
+
+      MessageBus.CkpMessage ckpMessage = messageOpt.get();
+      assertFalse(ckpMessage.committed);
+      assertThat(ckpMessage.commitInstant, is(MessageBus.ABORTED_CKP_INSTANT));
+      assertThat(ckpMessage.inflightInstant, 
is(MessageBus.ABORTED_CKP_INSTANT));
+    });
+  }
+
+  @Test
+  void testWriteCleaning() {
+    // write and read 20 committed checkpoints
+    IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + 
""));
+    assertThat("The id cache should not be cleaned", 
driver.getCkpIdCache().size(), is(20));
+
+    // write and read 10 aborted checkpoints
+    IntStream.range(20, 29).forEach(i -> driver.abortCkp(i));
+    assertThat("The id cache should not be cleaned", 
driver.getCkpIdCache().size(), is(29));
+
+    driver.commitCkp(29, "29", "30");
+    assertThat("The cache should be cleaned", driver.getCkpIdCache().size(), 
is(20));
+    assertThat(longSet2String(driver.getCkpIdCache().keySet()),
+        is("10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29"));
+  }
+
+  @Test
+  void testReadCleaning() {
+    MessageClient client = MessageClient.getSingleton(fs, basePath);
+
+    // write and read 20 committed checkpoints
+    IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + 
""));
+
+    IntStream.range(0, 10).forEach(client::getCkpMessage);
+    assertThat("The checkpoint cache should not be cleaned", 
client.getCkpCache().size(), is(10));
+
+    client.getCkpMessage(10);
+    assertThat("The checkpoint cache should be cleaned", 
client.getCkpCache().size(), is(10));
+
+    IntStream.range(11, 15).forEach(client::getCkpMessage);
+    assertThat("The checkpoint cache should be cleaned", 
client.getCkpCache().size(), is(10));
+    assertThat(longSet2String(client.getCkpCache().keySet()), 
is("5,6,7,8,9,10,11,12,13,14"));
+  }
+
+  private static String longSet2String(Set<Long> longSet) {
+    List<String> elements = new ArrayList<>();
+    longSet.stream().mapToInt(Long::intValue).forEach(i -> elements.add(i + 
""));
+    return String.join(",", elements);
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 54a142a..f1f5a1f 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -248,6 +248,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   public void close() throws Exception {
     coordinator.close();
     ioManager.close();
+    writeFunction.close();
   }
 
   public StreamWriteOperatorCoordinator getCoordinator() {

Reply via email to