This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c369c6  [HUDI-1757] Assigns the buckets by record key for Flink 
writer (#2757)
9c369c6 is described below

commit 9c369c607df2816ea2cd1221fb6d879e3fb8f74c
Author: Danny Chan <[email protected]>
AuthorDate: Tue Apr 6 19:06:41 2021 +0800

    [HUDI-1757] Assigns the buckets by record key for Flink writer (#2757)
    
    Currently we assign the buckets by record partition path which could
    cause hotspot if the partition field is datetime type. Changes to assign
    buckets by grouping the record whth their key first, the assignment is
    valid if only there is no conflict(two task write to the same bucket).
    
    This patch also changes the coordinator execution to be asynchronous.
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |   1 -
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  12 +-
 .../apache/hudi/common/util/ReflectionUtils.java   |   2 +-
 .../apache/hudi/configuration/FlinkOptions.java    |   8 +-
 .../java/org/apache/hudi/sink/CleanFunction.java   |   8 +-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  48 +---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 254 +++++++++------------
 .../apache/hudi/sink/compact/CompactFunction.java  |  42 +++-
 .../hudi/sink/compact/CompactionCommitSink.java    |  52 ++---
 .../sink/partitioner/BucketAssignFunction.java     |  90 ++------
 .../hudi/sink/partitioner/BucketAssigner.java      |  25 +-
 .../hudi/sink/partitioner/BucketAssigners.java     |   8 +-
 .../partitioner/delta/DeltaBucketAssigner.java     |  20 +-
 .../sink/transform/RowDataToHoodieFunction.java    |  75 +++++-
 .../hudi/sink/utils/CoordinatorExecutor.java       |  57 +++++
 .../apache/hudi/sink/utils/NonThrownExecutor.java  |  22 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |   4 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  20 +-
 .../org/apache/hudi/sink/StreamWriteITCase.java    |  12 +-
 .../sink/TestStreamWriteOperatorCoordinator.java   |  68 ++++--
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  23 +-
 .../hudi/sink/partitioner/TestBucketAssigner.java  | 114 ++++++++-
 .../hudi/sink/utils/CompactFunctionWrapper.java    |   5 +
 .../hudi/sink/utils/MockCoordinatorExecutor.java   |  51 +++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |  15 +-
 25 files changed, 637 insertions(+), 399 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 49c81b7..e5426ca 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -41,7 +41,6 @@ public class HoodieIndexUtils {
    * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested 
partitions.
    *
    * @param partition   Partition of interest
-   * @param context     Instance of {@link HoodieEngineContext} to use
    * @param hoodieTable Instance of {@link HoodieTable} of interest
    * @return the list of {@link HoodieBaseFile}
    */
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 6a6bced..7eeec4c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -260,11 +260,17 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
    * but cleaning action should trigger after all the write actions within a
    * checkpoint finish.
    *
-   * @param instantTime The latest successful commit time
+   * @param table         Table to commit on
+   * @param metadata      Commit Metadata corresponding to committed instant
+   * @param instantTime   Instant Time
+   * @param extraMetadata Additional Metadata passed by user
    */
-  public void postCommit(String instantTime) {
+  @Override
+  protected void postCommit(HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
+                            HoodieCommitMetadata metadata,
+                            String instantTime,
+                            Option<Map<String, String>> extraMetadata) {
     try {
-      HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
       // Delete the marker directory for the instant.
       new MarkerFiles(createTable(config, hadoopConf), instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index 23a87e7..bb30b2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -47,7 +47,7 @@ public class ReflectionUtils {
 
   private static Map<String, Class<?>> clazzCache = new HashMap<>();
 
-  private static Class<?> getClass(String clazzName) {
+  public static Class<?> getClass(String clazzName) {
     if (!clazzCache.containsKey(clazzName)) {
       try {
         Class<?> clazz = Class.forName(clazzName);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 8691d91..b66be2b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -255,7 +255,7 @@ public class FlinkOptions {
   public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
       .key("write.batch.size.MB")
       .doubleType()
-      .defaultValue(128D) // 128MB
+      .defaultValue(2D) // 2MB
       .withDescription("Batch buffer size in MB to flush data into the 
underneath filesystem");
 
   // ------------------------------------------------------------------------
@@ -294,6 +294,12 @@ public class FlinkOptions {
       .defaultValue(3600) // default 1 hour
       .withDescription("Max delta seconds time needed to trigger compaction, 
default 1 hour");
 
+  public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = 
ConfigOptions
+      .key("compaction.max_memory")
+      .intType()
+      .defaultValue(100) // default 100 MB
+      .withDescription("Max memory in MB for compaction spillable map, default 
100MB");
+
   public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
       .key("clean.async.enabled")
       .booleanType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 7b875ff..14dd827 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Sink function that cleans the old commits.
@@ -40,6 +42,8 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
  */
 public class CleanFunction<T> extends AbstractRichFunction
     implements SinkFunction<T>, CheckpointedFunction, CheckpointListener {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CleanFunction.class);
+
   private final Configuration conf;
 
   private HoodieFlinkWriteClient writeClient;
@@ -56,7 +60,7 @@ public class CleanFunction<T> extends AbstractRichFunction
     super.open(parameters);
     if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
       this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
-      this.executor = new NonThrownExecutor();
+      this.executor = new NonThrownExecutor(LOG);
     }
   }
 
@@ -70,7 +74,7 @@ public class CleanFunction<T> extends AbstractRichFunction
           // ensure to switch the isCleaning flag
           this.isCleaning = false;
         }
-      }, "wait for cleaning finish", "");
+      }, "wait for cleaning finish");
     }
   }
 
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 364d28e..b0321ac 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -31,9 +31,9 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -50,8 +50,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
 
 /**
@@ -104,21 +102,6 @@ public class StreamWriteFunction<K, I, O>
   private transient Map<String, DataBucket> buckets;
 
   /**
-   * The buffer lock to control data buffering/flushing.
-   */
-  private transient ReentrantLock bufferLock;
-
-  /**
-   * The condition to decide whether to add new records into the buffer.
-   */
-  private transient Condition addToBufferCondition;
-
-  /**
-   * Flag saying whether there is an on-going checkpoint.
-   */
-  private volatile boolean onCheckpointing = false;
-
-  /**
    * Config options.
    */
   private final Configuration config;
@@ -169,32 +152,15 @@ public class StreamWriteFunction<K, I, O>
 
   @Override
   public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
-    bufferLock.lock();
-    try {
-      // Based on the fact that the coordinator starts the checkpoint first,
-      // it would check the validity.
-      this.onCheckpointing = true;
-      // wait for the buffer data flush out and request a new instant
-      flushRemaining(false);
-      // signal the task thread to start buffering
-      addToBufferCondition.signal();
-    } finally {
-      this.onCheckpointing = false;
-      bufferLock.unlock();
-    }
+    // Based on the fact that the coordinator starts the checkpoint first,
+    // it would check the validity.
+    // wait for the buffer data flush out and request a new instant
+    flushRemaining(false);
   }
 
   @Override
   public void processElement(I value, KeyedProcessFunction<K, I, O>.Context 
ctx, Collector<O> out) throws Exception {
-    bufferLock.lock();
-    try {
-      if (onCheckpointing) {
-        addToBufferCondition.await();
-      }
-      bufferRecord(value);
-    } finally {
-      bufferLock.unlock();
-    }
+    bufferRecord(value);
   }
 
   @Override
@@ -247,8 +213,6 @@ public class StreamWriteFunction<K, I, O>
 
   private void initBuffer() {
     this.buckets = new LinkedHashMap<>();
-    this.bufferLock = new ReentrantLock();
-    this.addToBufferCondition = this.bufferLock.newCondition();
   }
 
   private void initWriteFunction() {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 51149e2..54a7603 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -26,27 +26,20 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.utils.CoordinatorExecutor;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.util.Preconditions;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -64,7 +57,7 @@ import static 
org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
  * <p>This coordinator starts a new instant when a new checkpoint starts. It 
commits the instant when all the
  * operator tasks write the buffer successfully for a round of checkpoint.
  *
- * <p>If there is no data for a round of checkpointing, it rolls back the 
metadata.
+ * <p>If there is no data for a round of checkpointing, it resets the events 
buffer and returns early.
  *
  * @see StreamWriteFunction for the work flow and semantics
  */
@@ -78,19 +71,19 @@ public class StreamWriteOperatorCoordinator
   private final Configuration conf;
 
   /**
-   * Write client.
+   * Coordinator context.
    */
-  private transient HoodieFlinkWriteClient writeClient;
+  private final Context context;
 
   /**
-   * Current data buffering checkpoint.
+   * Write client.
    */
-  private long inFlightCheckpoint = -1;
+  private transient HoodieFlinkWriteClient writeClient;
 
   /**
    * Current REQUESTED instant, for validation.
    */
-  private String instant = "";
+  private volatile String instant = "";
 
   /**
    * Event buffer for one round of checkpointing. When all the elements are 
non-null and have the same
@@ -111,7 +104,12 @@ public class StreamWriteOperatorCoordinator
   /**
    * A single-thread executor to handle all the asynchronous jobs of the 
coordinator.
    */
-  private NonThrownExecutor executor;
+  private CoordinatorExecutor executor;
+
+  /**
+   * A single-thread executor to handle asynchronous hive sync.
+   */
+  private NonThrownExecutor hiveSyncExecutor;
 
   /**
    * Context that holds variables for asynchronous hive sync.
@@ -121,14 +119,15 @@ public class StreamWriteOperatorCoordinator
   /**
    * Constructs a StreamingSinkOperatorCoordinator.
    *
-   * @param conf        The config options
-   * @param parallelism The operator task number
+   * @param conf    The config options
+   * @param context The coordinator context
    */
   public StreamWriteOperatorCoordinator(
       Configuration conf,
-      int parallelism) {
+      Context context) {
     this.conf = conf;
-    this.parallelism = parallelism;
+    this.context = context;
+    this.parallelism = context.currentParallelism();
     this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
   }
 
@@ -142,6 +141,8 @@ public class StreamWriteOperatorCoordinator
     initTableIfNotExists(this.conf);
     // start a new instant
     startInstant();
+    // start the executor
+    this.executor = new CoordinatorExecutor(this.context, LOG);
     // start the executor if required
     if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
       initHiveSync();
@@ -162,38 +163,46 @@ public class StreamWriteOperatorCoordinator
 
   @Override
   public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) {
-    try {
-      this.inFlightCheckpoint = checkpointId;
-      result.complete(writeCheckpointBytes());
-    } catch (Throwable throwable) {
-      // when a checkpoint fails, throws directly.
-      result.completeExceptionally(
-          new CompletionException(
-              String.format("Failed to checkpoint Instant %s for source %s",
-                  this.instant, this.getClass().getSimpleName()), throwable));
-    }
+    executor.execute(
+        () -> {
+          try {
+            result.complete(new byte[0]);
+          } catch (Throwable throwable) {
+            // when a checkpoint fails, throws directly.
+            result.completeExceptionally(
+                new CompletionException(
+                    String.format("Failed to checkpoint Instant %s for source 
%s",
+                        this.instant, this.getClass().getSimpleName()), 
throwable));
+          }
+        }, "taking checkpoint %d", checkpointId
+    );
   }
 
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
-    // start to commit the instant.
-    final String errorMsg = String.format("Instant [%s] has a complete 
checkpoint [%d],\n"
-        + "but the coordinator has not received full write success events,\n"
-        + "rolls back the instant and rethrow", this.instant, checkpointId);
-    checkAndForceCommit(errorMsg);
-    // if async compaction is on, schedule the compaction
-    if (needsScheduleCompaction) {
-      writeClient.scheduleCompaction(Option.empty());
-    }
+    executor.execute(
+        () -> {
+          // for streaming mode, commits the ever received events anyway,
+          // the stream write task snapshot and flush the data buffer 
synchronously in sequence,
+          // so a successful checkpoint subsumes the old one(follows the 
checkpoint subsuming contract)
+          final boolean committed = commitInstant();
+          if (committed) {
+            // if async compaction is on, schedule the compaction
+            if (needsScheduleCompaction) {
+              writeClient.scheduleCompaction(Option.empty());
+            }
+            // start new instant.
+            startInstant();
+          }
+        }, "commits the instant %s", this.instant
+    );
     // sync Hive if is enabled
     syncHiveIfEnabled();
-    // start new instant.
-    startInstant();
   }
 
   private void syncHiveIfEnabled() {
     if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
-      this.executor.execute(this::syncHive, "sync hive metadata", 
this.instant);
+      this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for 
instant %s", this.instant);
     }
   }
 
@@ -211,43 +220,38 @@ public class StreamWriteOperatorCoordinator
             this.conf.getString(FlinkOptions.TABLE_NAME), 
conf.getString(FlinkOptions.TABLE_TYPE));
   }
 
-  public void notifyCheckpointAborted(long checkpointId) {
-    Preconditions.checkState(inFlightCheckpoint == checkpointId,
-        "The aborted checkpoint should always be the last checkpoint");
-    checkAndForceCommit("The last checkpoint was aborted, roll back the last 
write and throw");
-  }
-
   @Override
   public void resetToCheckpoint(long checkpointID, @Nullable byte[] 
checkpointData) throws Exception {
-    if (checkpointData != null) {
-      // restore when any checkpoint completed
-      deserializeCheckpointAndRestore(checkpointData);
-    }
+    // no operation
   }
 
   @Override
   public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
-    // no event to handle
-    ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
-        "The coordinator can only handle BatchWriteSuccessEvent");
-    BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
-    // the write task does not block after checkpointing(and before it 
receives a checkpoint success event),
-    // if it it checkpoints succeed then flushes the data buffer again before 
this coordinator receives a checkpoint
-    // success event, the data buffer would flush with an older instant time.
-    ValidationUtils.checkState(
-        HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
-        String.format("Receive an unexpected event for instant %s from task 
%d",
-            event.getInstantTime(), event.getTaskID()));
-    if (this.eventBuffer[event.getTaskID()] != null) {
-      this.eventBuffer[event.getTaskID()].mergeWith(event);
-    } else {
-      this.eventBuffer[event.getTaskID()] = event;
-    }
-    if (event.isEndInput() && checkReady()) {
-      // start to commit the instant.
-      doCommit();
-      // no compaction scheduling for batch mode
-    }
+    executor.execute(
+        () -> {
+          // no event to handle
+          ValidationUtils.checkState(operatorEvent instanceof 
BatchWriteSuccessEvent,
+              "The coordinator can only handle BatchWriteSuccessEvent");
+          BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) 
operatorEvent;
+          // the write task does not block after checkpointing(and before it 
receives a checkpoint success event),
+          // if it it checkpoints succeed then flushes the data buffer again 
before this coordinator receives a checkpoint
+          // success event, the data buffer would flush with an older instant 
time.
+          ValidationUtils.checkState(
+              HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
+              String.format("Receive an unexpected event for instant %s from 
task %d",
+                  event.getInstantTime(), event.getTaskID()));
+          if (this.eventBuffer[event.getTaskID()] != null) {
+            this.eventBuffer[event.getTaskID()].mergeWith(event);
+          } else {
+            this.eventBuffer[event.getTaskID()] = event;
+          }
+          if (event.isEndInput() && allEventsReceived()) {
+            // start to commit the instant.
+            commitInstant();
+            // no compaction scheduling for batch mode
+          }
+        }, "handle write success event for instant %s", this.instant
+    );
   }
 
   @Override
@@ -265,85 +269,31 @@ public class StreamWriteOperatorCoordinator
   // -------------------------------------------------------------------------
 
   private void initHiveSync() {
-    this.executor = new NonThrownExecutor();
+    this.hiveSyncExecutor = new NonThrownExecutor(LOG);
     this.hiveSyncContext = HiveSyncContext.create(conf);
   }
 
-  static byte[] readBytes(DataInputStream in, int size) throws IOException {
-    byte[] bytes = new byte[size];
-    in.readFully(bytes);
-    return bytes;
-  }
-
-  /**
-   * Serialize the coordinator state. The current implementation may not be 
super efficient,
-   * but it should not matter that much because most of the state should be 
rather small.
-   * Large states themselves may already be a problem regardless of how the 
serialization
-   * is implemented.
-   *
-   * @return A byte array containing the serialized state of the source 
coordinator.
-   * @throws IOException When something goes wrong in serialization.
-   */
-  private byte[] writeCheckpointBytes() throws IOException {
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-         DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
-
-      out.writeLong(this.inFlightCheckpoint);
-      byte[] serializedInstant = this.instant.getBytes();
-      out.writeInt(serializedInstant.length);
-      out.write(serializedInstant);
-      out.flush();
-      return baos.toByteArray();
-    }
-  }
-
-  /**
-   * Restore the state of this source coordinator from the state bytes.
-   *
-   * @param bytes The checkpoint bytes that was returned from {@link 
#writeCheckpointBytes()}
-   * @throws Exception When the deserialization failed.
-   */
-  private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception {
-    try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-         DataInputStream in = new DataInputViewStreamWrapper(bais)) {
-      long checkpointID = in.readLong();
-      int serializedInstantSize = in.readInt();
-      byte[] serializedInstant = readBytes(in, serializedInstantSize);
-      this.inFlightCheckpoint = checkpointID;
-      this.instant = new String(serializedInstant);
-    }
-  }
-
   private void reset() {
-    this.instant = "";
     this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
   }
 
-  private void checkAndForceCommit(String errMsg) {
-    if (!checkReady()) {
-      // forced but still has inflight instant
-      String inflightInstant = 
writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
-      if (inflightInstant != null) {
-        assert inflightInstant.equals(this.instant);
-        writeClient.rollback(this.instant);
-        throw new HoodieException(errMsg);
-      }
-      if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
-        // The last checkpoint finished successfully.
-        return;
-      }
-    }
-    doCommit();
-  }
-
   /** Checks the buffer is ready to commit. */
-  private boolean checkReady() {
+  private boolean allEventsReceived() {
     return Arrays.stream(eventBuffer)
         .allMatch(event -> event != null && event.isReady(this.instant));
   }
 
-  /** Performs the actual commit action. */
-  private void doCommit() {
+  /**
+   * Commits the instant.
+   *
+   * @return true if the write statuses are committed successfully.
+   */
+  private boolean commitInstant() {
+    if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
+      // The last checkpoint finished successfully.
+      return false;
+    }
+
     List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
         .filter(Objects::nonNull)
         .map(BatchWriteSuccessEvent::getWriteStatuses)
@@ -351,12 +301,16 @@ public class StreamWriteOperatorCoordinator
         .collect(Collectors.toList());
 
     if (writeResults.size() == 0) {
-      // No data has written, clear the metadata file
-      
this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE),
 this.instant);
+      // No data has written, reset the buffer and returns early
       reset();
-      return;
+      return false;
     }
+    doCommit(writeResults);
+    return true;
+  }
 
+  /** Performs the actual commit action. */
+  private void doCommit(List<WriteStatus> writeResults) {
     // commit or rollback
     long totalErrorRecords = 
writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
     long totalRecords = 
writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
@@ -371,7 +325,6 @@ public class StreamWriteOperatorCoordinator
 
       boolean success = writeClient.commit(this.instant, writeResults, 
Option.of(checkpointCommitMetadata));
       if (success) {
-        writeClient.postCommit(this.instant);
         reset();
         LOG.info("Commit instant [{}] success!", this.instant);
       } else {
@@ -409,6 +362,19 @@ public class StreamWriteOperatorCoordinator
     return writeClient;
   }
 
+  @VisibleForTesting
+  public Context getContext() {
+    return context;
+  }
+
+  @VisibleForTesting
+  public void setExecutor(CoordinatorExecutor executor) throws Exception {
+    if (this.executor != null) {
+      this.executor.close();
+    }
+    this.executor = executor;
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
@@ -432,7 +398,7 @@ public class StreamWriteOperatorCoordinator
 
     @Override
     public OperatorCoordinator create(Context context) {
-      return new StreamWriteOperatorCoordinator(this.conf, 
context.currentParallelism());
+      return new StreamWriteOperatorCoordinator(this.conf, context);
     }
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index 7f4f7b9..ee8678b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -21,13 +21,17 @@ package org.apache.hudi.sink.compact;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
 import 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
@@ -36,6 +40,7 @@ import java.util.List;
  * In order to execute scalable, the input should shuffle by the compact event 
{@link CompactionPlanEvent}.
  */
 public class CompactFunction extends KeyedProcessFunction<Long, 
CompactionPlanEvent, CompactionCommitEvent> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactFunction.class);
 
   /**
    * Config options.
@@ -52,6 +57,11 @@ public class CompactFunction extends 
KeyedProcessFunction<Long, CompactionPlanEv
    */
   private int taskID;
 
+  /**
+   * Executor service to execute the compaction task.
+   */
+  private transient NonThrownExecutor executor;
+
   public CompactFunction(Configuration conf) {
     this.conf = conf;
   }
@@ -60,23 +70,33 @@ public class CompactFunction extends 
KeyedProcessFunction<Long, CompactionPlanEv
   public void open(Configuration parameters) throws Exception {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
     this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    this.executor = new NonThrownExecutor(LOG);
   }
 
   @Override
   public void processElement(CompactionPlanEvent event, Context context, 
Collector<CompactionCommitEvent> collector) throws Exception {
     final String instantTime = event.getCompactionInstantTime();
     final CompactionOperation compactionOperation = event.getOperation();
+    // executes the compaction task asynchronously to not block the checkpoint 
barrier propagate.
+    executor.execute(
+        () -> {
+          HoodieFlinkMergeOnReadTableCompactor compactor = new 
HoodieFlinkMergeOnReadTableCompactor();
+          List<WriteStatus> writeStatuses = compactor.compact(
+              new HoodieFlinkCopyOnWriteTable<>(
+                  this.writeClient.getConfig(),
+                  this.writeClient.getEngineContext(),
+                  this.writeClient.getHoodieTable().getMetaClient()),
+              this.writeClient.getHoodieTable().getMetaClient(),
+              this.writeClient.getConfig(),
+              compactionOperation,
+              instantTime);
+          collector.collect(new CompactionCommitEvent(instantTime, 
writeStatuses, taskID));
+        }, "Execute compaction for instant %s from task %d", instantTime, 
taskID
+    );
+  }
 
-    HoodieFlinkMergeOnReadTableCompactor compactor = new 
HoodieFlinkMergeOnReadTableCompactor();
-    List<WriteStatus> writeStatuses = compactor.compact(
-        new HoodieFlinkCopyOnWriteTable<>(
-            this.writeClient.getConfig(),
-            this.writeClient.getEngineContext(),
-            this.writeClient.getHoodieTable().getMetaClient()),
-        this.writeClient.getHoodieTable().getMetaClient(),
-        this.writeClient.getConfig(),
-        compactionOperation,
-        instantTime);
-    collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, 
taskID));
+  @VisibleForTesting
+  public void setExecutor(NonThrownExecutor executor) {
+    this.executor = executor;
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 86dae20..41831cd 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -23,8 +23,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.sink.CleanFunction;
@@ -37,8 +35,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -67,13 +66,9 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
 
   /**
    * Buffer to collect the event from each compact task {@code 
CompactFunction}.
+   * The key is the instant time.
    */
-  private transient List<CompactionCommitEvent> commitBuffer;
-
-  /**
-   * Current on-going compaction instant time.
-   */
-  private String compactionInstantTime;
+  private transient Map<String, List<CompactionCommitEvent>> commitBuffer;
 
   public CompactionCommitSink(Configuration conf) {
     super(conf);
@@ -84,36 +79,32 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
-    this.commitBuffer = new ArrayList<>();
+    this.commitBuffer = new HashMap<>();
   }
 
   @Override
   public void invoke(CompactionCommitEvent event, Context context) throws 
Exception {
-    if (compactionInstantTime == null) {
-      compactionInstantTime = event.getInstant();
-    } else if (!event.getInstant().equals(compactionInstantTime)) {
-      // last compaction still not finish, rolls it back
-      HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(this.compactionInstantTime);
-      writeClient.rollbackInflightCompaction(inflightInstant);
-      this.compactionInstantTime = event.getInstant();
-    }
-    this.commitBuffer.add(event);
-    commitIfNecessary();
+    final String instant = event.getInstant();
+    commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
+        .add(event);
+    commitIfNecessary(instant, commitBuffer.get(instant));
   }
 
   /**
    * Condition to commit: the commit buffer has equal size with the compaction 
plan operations
    * and all the compact commit event {@link CompactionCommitEvent} has the 
same compaction instant time.
+   *
+   * @param instant Compaction commit instant time
+   * @param events  Commit events ever received for the instant
    */
-  private void commitIfNecessary() throws IOException {
+  private void commitIfNecessary(String instant, List<CompactionCommitEvent> 
events) throws IOException {
     HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
-        this.writeClient.getHoodieTable().getMetaClient(), 
compactionInstantTime);
-    boolean isReady = compactionPlan.getOperations().size() == 
commitBuffer.size()
-        && commitBuffer.stream().allMatch(event -> event != null && 
Objects.equals(event.getInstant(), compactionInstantTime));
+        this.writeClient.getHoodieTable().getMetaClient(), instant);
+    boolean isReady = compactionPlan.getOperations().size() == events.size();
     if (!isReady) {
       return;
     }
-    List<WriteStatus> statuses = this.commitBuffer.stream()
+    List<WriteStatus> statuses = events.stream()
         .map(CompactionCommitEvent::getWriteStatuses)
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
@@ -127,16 +118,15 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
       }
       metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
writeClient.getConfig().getSchema());
       this.writeClient.completeCompaction(
-          metadata, statuses, this.writeClient.getHoodieTable(), 
compactionInstantTime);
+          metadata, statuses, this.writeClient.getHoodieTable(), instant);
     }
     // commit the compaction
-    this.writeClient.commitCompaction(compactionInstantTime, statuses, 
Option.empty());
+    this.writeClient.commitCompaction(instant, statuses, Option.empty());
     // reset the status
-    reset();
+    reset(instant);
   }
 
-  private void reset() {
-    this.commitBuffer.clear();
-    this.compactionInstantTime = null;
+  private void reset(String instant) {
+    this.commitBuffer.remove(instant);
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 5ea5f92..9c23259 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.partitioner;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -56,8 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * The function to build the write profile incrementally for records within a 
checkpoint,
@@ -107,28 +104,10 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
   private final boolean isChangingRecords;
 
   /**
-   * All the partition paths when the task starts. It is used to help checking 
whether all the partitions
-   * are loaded into the state.
-   */
-  private transient Set<String> initialPartitionsToLoad;
-
-  /**
    * State to book-keep which partition is loaded into the index state {@code 
indexState}.
    */
   private MapState<String, Integer> partitionLoadState;
 
-  /**
-   * Whether all partitions are loaded, if it is true,
-   * we can only check the state for locations.
-   */
-  private boolean allPartitionsLoaded = false;
-
-  /**
-   * Flag saying whether to check that all the partitions are loaded.
-   * So that there is chance that flag {@code allPartitionsLoaded} becomes 
true.
-   */
-  private boolean checkPartition = true;
-
   public BucketAssignFunction(Configuration conf) {
     this.conf = conf;
     this.isChangingRecords = WriteOperationType.isChangingRecords(
@@ -144,12 +123,11 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
         new SerializableConfiguration(this.hadoopConf),
         new FlinkTaskContextSupplier(getRuntimeContext()));
     this.bucketAssigner = BucketAssigners.create(
+        getRuntimeContext().getIndexOfThisSubtask(),
+        getRuntimeContext().getNumberOfParallelSubtasks(),
         HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
         context,
         writeConfig);
-
-    // initialize and check the partitions load state
-    loadInitialPartitions();
   }
 
   @Override
@@ -181,15 +159,7 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
 
-    // Checks whether all the partitions are loaded first.
-    if (checkPartition && !allPartitionsLoaded) {
-      checkPartitionsLoaded();
-      checkPartition = false;
-    }
-
-    if (!allPartitionsLoaded
-        && initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // 
this is an existing partition
-        && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
+    if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) {
       // If the partition records are never loaded, load the records first.
       loadRecords(hoodieKey.getPartitionPath());
     }
@@ -226,9 +196,6 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
   public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.refreshTable();
-    if (!allPartitionsLoaded) {
-      checkPartition = true;
-    }
   }
 
   /**
@@ -241,12 +208,21 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
     List<HoodieBaseFile> latestBaseFiles =
         HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, 
hoodieTable);
+    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    final int maxParallelism = 
getRuntimeContext().getMaxNumberOfParallelSubtasks();
+    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
     for (HoodieBaseFile baseFile : latestBaseFiles) {
       List<HoodieKey> hoodieKeys =
           ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new 
Path(baseFile.getPath()));
       hoodieKeys.forEach(hoodieKey -> {
         try {
-          this.indexState.put(hoodieKey, new 
HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+          // Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
+          // the input records is shuffled by record key
+          boolean shouldLoad = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(
+              hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
+          if (shouldLoad) {
+            this.indexState.put(hoodieKey, new 
HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+          }
         } catch (Exception e) {
           throw new HoodieIOException("Error when load record keys from file: 
" + baseFile);
         }
@@ -256,49 +232,9 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     partitionLoadState.put(partitionPath, 0);
   }
 
-  /**
-   * Loads the existing partitions for this task.
-   */
-  private void loadInitialPartitions() {
-    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
-        this.conf.getString(FlinkOptions.PATH), false, false, false);
-    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
-    final int maxParallelism = 
getRuntimeContext().getMaxNumberOfParallelSubtasks();
-    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
-    // reference: org.apache.flink.streaming.api.datastream.KeyedStream
-    this.initialPartitionsToLoad = allPartitionPaths.stream()
-        .filter(partition -> 
KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, 
parallelism) == taskID)
-        .collect(Collectors.toSet());
-  }
-
-  /**
-   * Checks whether all the partitions of the table are loaded into the state,
-   * set the flag {@code allPartitionsLoaded} to true if it is.
-   */
-  private void checkPartitionsLoaded() {
-    for (String partition : this.initialPartitionsToLoad) {
-      try {
-        if (!this.partitionLoadState.contains(partition)) {
-          return;
-        }
-      } catch (Exception e) {
-        LOG.warn("Error when check whether all partitions are loaded, 
ignored", e);
-        throw new HoodieException(e);
-      }
-    }
-    this.allPartitionsLoaded = true;
-  }
-
-  @VisibleForTesting
-  public boolean isAllPartitionsLoaded() {
-    return this.allPartitionsLoaded;
-  }
-
   @VisibleForTesting
   public void clearIndexState() {
-    this.allPartitionsLoaded = false;
     this.indexState.clear();
-    loadInitialPartitions();
   }
 
   @VisibleForTesting
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index 58bbe9c..d89ad83 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -61,6 +61,16 @@ public class BucketAssigner {
   private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
 
   /**
+   * Task ID.
+   */
+  private final int taskID;
+
+  /**
+   * Number of tasks.
+   */
+  private final int numTasks;
+
+  /**
    * Remembers what type each bucket is for later.
    */
   private final HashMap<String, BucketInfo> bucketInfoMap;
@@ -104,12 +114,16 @@ public class BucketAssigner {
   private final Map<String, NewFileAssignState> newFileAssignStates;
 
   public BucketAssigner(
+      int taskID,
+      int numTasks,
       HoodieFlinkEngineContext context,
       HoodieWriteConfig config) {
     bucketInfoMap = new HashMap<>();
     partitionSmallFilesMap = new HashMap<>();
     smallFileAssignStates = new HashMap<>();
     newFileAssignStates = new HashMap<>();
+    this.taskID = taskID;
+    this.numTasks = numTasks;
     this.context = context;
     this.config = config;
     this.table = HoodieFlinkTable.create(this.config, this.context);
@@ -187,7 +201,7 @@ public class BucketAssigner {
     if (partitionSmallFilesMap.containsKey(partitionPath)) {
       return partitionSmallFilesMap.get(partitionPath);
     }
-    List<SmallFile> smallFiles = getSmallFiles(partitionPath);
+    List<SmallFile> smallFiles = 
smallFilesOfThisTask(getSmallFiles(partitionPath));
     if (smallFiles.size() > 0) {
       LOG.info("For partitionPath : " + partitionPath + " Small Files => " + 
smallFiles);
       partitionSmallFilesMap.put(partitionPath, smallFiles);
@@ -240,6 +254,15 @@ public class BucketAssigner {
     return smallFileLocations;
   }
 
+  private List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) {
+    // computes the small files to write inserts for this task.
+    List<SmallFile> smallFilesOfThisTask = new ArrayList<>();
+    for (int i = taskID; i < smallFiles.size(); i += numTasks) {
+      smallFilesOfThisTask.add(smallFiles.get(i));
+    }
+    return smallFilesOfThisTask;
+  }
+
   /**
    * Obtains the average record size based on records written during previous 
commits. Used for estimating how many
    * records pack into one file.
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
index f5703f1..237ec27 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
@@ -33,20 +33,24 @@ public abstract class BucketAssigners {
   /**
    * Creates a {@code BucketAssigner}.
    *
+   * @param taskID    The task ID
+   * @param numTasks  The number of tasks
    * @param tableType The table type
    * @param context   The engine context
    * @param config    The configuration
    * @return the bucket assigner instance
    */
   public static BucketAssigner create(
+      int taskID,
+      int numTasks,
       HoodieTableType tableType,
       HoodieFlinkEngineContext context,
       HoodieWriteConfig config) {
     switch (tableType) {
       case COPY_ON_WRITE:
-        return new BucketAssigner(context, config);
+        return new BucketAssigner(taskID, numTasks, context, config);
       case MERGE_ON_READ:
-        return new DeltaBucketAssigner(context, config);
+        return new DeltaBucketAssigner(taskID, numTasks, context, config);
       default:
         throw new AssertionError();
     }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
index 895f593..deb8250 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
@@ -40,8 +40,12 @@ import java.util.stream.Collectors;
  * <p>Note: assumes the index can always index log files for Flink write.
  */
 public class DeltaBucketAssigner extends BucketAssigner {
-  public DeltaBucketAssigner(HoodieFlinkEngineContext context, 
HoodieWriteConfig config) {
-    super(context, config);
+  public DeltaBucketAssigner(
+      int taskID,
+      int numTasks,
+      HoodieFlinkEngineContext context,
+      HoodieWriteConfig config) {
+    super(taskID, numTasks, context, config);
   }
 
   @Override
@@ -77,11 +81,13 @@ public class DeltaBucketAssigner extends BucketAssigner {
           sf.sizeBytes = getTotalFileSize(smallFileSlice);
           smallFileLocations.add(sf);
         } else {
-          HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
+          smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> {
+            // in case there is something error, and the file slice has no log 
file
+            sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+                FSUtils.getFileIdFromLogPath(logFile.getPath()));
+            sf.sizeBytes = getTotalFileSize(smallFileSlice);
+            smallFileLocations.add(sf);
+          });
         }
       }
     }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index 1d41003..5bd3c68 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -23,6 +23,9 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.util.RowDataToAvroConverters;
@@ -36,7 +39,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
 
 /**
  * Function that transforms RowData to HoodieRecord.
@@ -64,6 +71,11 @@ public class RowDataToHoodieFunction<I extends RowData, O 
extends HoodieRecord<?
   private transient KeyGenerator keyGenerator;
 
   /**
+   * Utilities to create hoodie pay load instance.
+   */
+  private transient PayloadCreation payloadCreation;
+
+  /**
    * Config options.
    */
   private final Configuration config;
@@ -79,6 +91,7 @@ public class RowDataToHoodieFunction<I extends RowData, O 
extends HoodieRecord<?
     this.avroSchema = StreamerUtil.getSourceSchema(this.config);
     this.converter = RowDataToAvroConverters.createConverter(this.rowType);
     this.keyGenerator = 
StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
+    this.payloadCreation = PayloadCreation.instance(config);
   }
 
   @SuppressWarnings("unchecked")
@@ -95,19 +108,61 @@ public class RowDataToHoodieFunction<I extends RowData, O 
extends HoodieRecord<?
    * @throws IOException if error occurs
    */
   @SuppressWarnings("rawtypes")
-  private HoodieRecord toHoodieRecord(I record) throws IOException {
-    boolean shouldCombine = 
this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
-        || 
WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == 
WriteOperationType.UPSERT;
+  private HoodieRecord toHoodieRecord(I record) throws Exception {
     GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, 
record);
-    final String payloadClazz = 
this.config.getString(FlinkOptions.PAYLOAD_CLASS);
-    Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
-        this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
     final HoodieKey hoodieKey = keyGenerator.getKey(gr);
     // nullify the payload insert data to mark the record as a DELETE
-    gr = record.getRowKind() == RowKind.DELETE ? null : gr;
-    HoodieRecordPayload payload = shouldCombine
-        ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
-        : StreamerUtil.createPayload(payloadClazz, gr);
+    final boolean isDelete = record.getRowKind() == RowKind.DELETE;
+    HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
     return new HoodieRecord<>(hoodieKey, payload);
   }
+
+  /**
+   * Util to create hoodie pay load instance.
+   */
+  private static class PayloadCreation implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final boolean shouldCombine;
+    private final Constructor<?> constructor;
+    private final String preCombineField;
+
+    private PayloadCreation(
+        boolean shouldCombine,
+        Constructor<?> constructor,
+        @Nullable String preCombineField) {
+      this.shouldCombine = shouldCombine;
+      this.constructor = constructor;
+      this.preCombineField = preCombineField;
+    }
+
+    public static PayloadCreation instance(Configuration conf) throws 
Exception {
+      boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+          || 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == 
WriteOperationType.UPSERT;
+      String preCombineField = null;
+      final Class<?>[] argTypes;
+      final Constructor<?> constructor;
+      if (shouldCombine) {
+        preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
+        argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
+      } else {
+        argTypes = new Class<?>[] {Option.class};
+      }
+      final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
+      constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
+      return new PayloadCreation(shouldCombine, constructor, preCombineField);
+    }
+
+    public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean 
isDelete) throws Exception {
+      if (shouldCombine) {
+        ValidationUtils.checkState(preCombineField != null);
+        Comparable<?> orderingVal = (Comparable<?>) 
HoodieAvroUtils.getNestedFieldVal(record,
+            preCombineField, false);
+        return (HoodieRecordPayload<?>) constructor.newInstance(
+            isDelete ? null : record, orderingVal);
+      } else {
+        return (HoodieRecordPayload<?>) 
this.constructor.newInstance(Option.of(record));
+      }
+    }
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java
new file mode 100644
index 0000000..b595767
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Coordinator executor that executes the tasks asynchronously, it fails the 
job
+ * for any task exceptions.
+ *
+ * <p>We need this because the coordinator methods are called by
+ * the Job Manager's main thread (mailbox thread), executes the methods 
asynchronously
+ * to avoid blocking the main thread.
+ */
+public class CoordinatorExecutor extends NonThrownExecutor {
+  private final OperatorCoordinator.Context context;
+
+  public CoordinatorExecutor(OperatorCoordinator.Context context, Logger 
logger) {
+    super(logger);
+    this.context = context;
+  }
+
+  @Override
+  protected void exceptionHook(String actionString, Throwable t) {
+    this.context.failJob(new HoodieException(actionString, t));
+  }
+
+  @Override
+  public void close() throws Exception {
+    // wait for the remaining tasks to finish.
+    executor.shutdown();
+    // We do not expect this to actually block for long. At this point, there 
should
+    // be very few task running in the executor, if any.
+    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
index 1d0542e..87c9c01 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.utils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -31,15 +30,16 @@ import java.util.concurrent.TimeUnit;
  * An executor service that catches all the throwable with logging.
  */
 public class NonThrownExecutor implements AutoCloseable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(NonThrownExecutor.class);
+  private final Logger logger;
 
   /**
    * A single-thread executor to handle all the asynchronous jobs.
    */
-  private final ExecutorService executor;
+  protected final ExecutorService executor;
 
-  public NonThrownExecutor() {
+  public NonThrownExecutor(Logger logger) {
     this.executor = Executors.newSingleThreadExecutor();
+    this.logger = logger;
   }
 
   /**
@@ -48,24 +48,30 @@ public class NonThrownExecutor implements AutoCloseable {
   public void execute(
       final ThrowingRunnable<Throwable> action,
       final String actionName,
-      final String instant) {
+      final Object... actionParams) {
 
     executor.execute(
         () -> {
+          final String actionString = String.format(actionName, actionParams);
           try {
             action.run();
-            LOG.info("Executor executes action [{}] for instant [{}] 
success!", actionName, instant);
+            logger.info("Executor executes action [{}] success!", 
actionString);
           } catch (Throwable t) {
             // if we have a JVM critical error, promote it immediately, there 
is a good
             // chance the
             // logging or job failing will not succeed any more
             ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            final String errMsg = String.format("Executor executes action [%s] 
error", actionName);
-            LOG.error(errMsg, t);
+            final String errMsg = String.format("Executor executes action [%s] 
error", actionString);
+            logger.error(errMsg, t);
+            exceptionHook(errMsg, t);
           }
         });
   }
 
+  protected void exceptionHook(String errMsg, Throwable t) {
+    // for sub-class to override.
+  }
+
   @Override
   public void close() throws Exception {
     if (executor != null) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 28568f7..a568a3f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -69,8 +69,8 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning {
 
       DataStream<Object> pipeline = dataStream
           .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
-          // Key-by partition path, to avoid multiple subtasks write to a 
partition at the same time
-          .keyBy(HoodieRecord::getPartitionPath)
+          // Key-by record key, to avoid multiple subtasks write to a bucket 
at the same time
+          .keyBy(HoodieRecord::getRecordKey)
           .transform(
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 900ec41..8fed30b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.TablePathUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
@@ -179,19 +180,6 @@ public class StreamerUtil {
     }
   }
 
-  /**
-   * Create a payload class via reflection, do not ordering/precombine value.
-   */
-  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record)
-      throws IOException {
-    try {
-      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-          new Class<?>[] {Option.class}, Option.of(record));
-    } catch (Throwable e) {
-      throw new IOException("Could not create payload for class: " + 
payloadClass, e);
-    }
-  }
-
   public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig 
conf) {
     return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
   }
@@ -215,6 +203,12 @@ public class StreamerUtil {
                     // actually Flink cleaning is always with parallelism 1 now
                     .withCleanerParallelism(20)
                     .build())
+            .withMemoryConfig(
+                HoodieMemoryConfig.newBuilder()
+                    .withMaxMemoryMaxSize(
+                        conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 
1024 * 1024L,
+                        conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 
1024 * 1024L
+                        ).build())
             .forTable(conf.getString(FlinkOptions.TABLE_NAME))
             .withAutoCommit(false)
             
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 94e0975..361fcef 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -120,8 +120,8 @@ public class StreamWriteITCase extends TestLogger {
         .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
         .setParallelism(4)
         .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
-        // Key-by partition path, to avoid multiple subtasks write to a 
partition at the same time
-        .keyBy(HoodieRecord::getPartitionPath)
+        // Key-by record key, to avoid multiple subtasks write to a bucket at 
the same time
+        .keyBy(HoodieRecord::getRecordKey)
         .transform(
             "bucket_assigner",
             TypeInformation.of(HoodieRecord.class),
@@ -179,8 +179,8 @@ public class StreamWriteITCase extends TestLogger {
         .name("instant_generator")
         .uid("instant_generator_id")
 
-        // Keyby partition path, to avoid multiple subtasks writing to a 
partition at the same time
-        .keyBy(HoodieRecord::getPartitionPath)
+        // Key-by record key, to avoid multiple subtasks write to a bucket at 
the same time
+        .keyBy(HoodieRecord::getRecordKey)
         // use the bucket assigner to generate bucket IDs
         .transform(
             "bucket_assigner",
@@ -249,8 +249,8 @@ public class StreamWriteITCase extends TestLogger {
         .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
         .setParallelism(4)
         .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
-        // Key-by partition path, to avoid multiple subtasks write to a 
partition at the same time
-        .keyBy(HoodieRecord::getPartitionPath)
+        // Key-by record key, to avoid multiple subtasks write to a bucket at 
the same time
+        .keyBy(HoodieRecord::getRecordKey)
         .transform(
             "bucket_assigner",
             TypeInformation.of(HoodieRecord.class),
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 321abfa..0afd414 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -23,12 +23,15 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,11 +45,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -60,9 +64,11 @@ public class TestStreamWriteOperatorCoordinator {
 
   @BeforeEach
   public void before() throws Exception {
+    OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), 2);
     coordinator = new StreamWriteOperatorCoordinator(
-        TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
+        TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 
context);
     coordinator.start();
+    coordinator.setExecutor(new MockCoordinatorExecutor(context));
   }
 
   @AfterEach
@@ -99,8 +105,8 @@ public class TestStreamWriteOperatorCoordinator {
 
     coordinator.notifyCheckpointComplete(1);
     String inflight = coordinator.getWriteClient()
-        .getInflightAndRequestedInstant("COPY_ON_WRITE");
-    String lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE");
+        .getInflightAndRequestedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    String lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
     assertThat("Instant should be complete", lastCompleted, is(instant));
     assertNotEquals("", inflight, "Should start a new instant");
     assertNotEquals(instant, inflight, "Should start a new instant");
@@ -131,27 +137,43 @@ public class TestStreamWriteOperatorCoordinator {
         .instantTime("abc")
         .writeStatus(Collections.emptyList())
         .build();
-    assertThrows(IllegalStateException.class,
-        () -> coordinator.handleEventFromOperator(0, event),
+
+    assertError(() -> coordinator.handleEventFromOperator(0, event),
         "Receive an unexpected event for instant abc from task 0");
   }
 
   @Test
-  public void testCheckpointCompleteWithException() {
+  public void testCheckpointCompleteWithPartialEvents() {
     final CompletableFuture<byte[]> future = new CompletableFuture<>();
     coordinator.checkpointCoordinator(1, future);
-    String inflightInstant = coordinator.getInstant();
+    String instant = coordinator.getInstant();
     OperatorEvent event = BatchWriteSuccessEvent.builder()
         .taskID(0)
-        .instantTime(inflightInstant)
+        .instantTime(instant)
         .writeStatus(Collections.emptyList())
         .build();
     coordinator.handleEventFromOperator(0, event);
-    assertThrows(HoodieException.class,
-        () -> coordinator.notifyCheckpointComplete(1),
-        "org.apache.hudi.exception.HoodieException: Instant [20210330153432] 
has a complete checkpoint [1],\n"
-            + "but the coordinator has not received full write success 
events,\n"
-            + "rolls back the instant and rethrow");
+
+    assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1),
+        "Returns early for empty write results");
+    String lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    assertNull(lastCompleted, "Returns early for empty write results");
+    assertNull(coordinator.getEventBuffer()[0]);
+
+    WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
+    writeStatus1.setPartitionPath("par2");
+    writeStatus1.setStat(new HoodieWriteStat());
+    OperatorEvent event1 = BatchWriteSuccessEvent.builder()
+        .taskID(1)
+        .instantTime(instant)
+        .writeStatus(Collections.singletonList(writeStatus1))
+        .isLastBatch(true)
+        .build();
+    coordinator.handleEventFromOperator(1, event1);
+    assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
+        "Commits the instant with partial events anyway");
+    lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    assertThat("Commits the instant with partial events anyway", 
lastCompleted, is(instant));
   }
 
   @Test
@@ -159,8 +181,10 @@ public class TestStreamWriteOperatorCoordinator {
     // override the default configuration
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
-    coordinator = new StreamWriteOperatorCoordinator(conf, 1);
+    OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), 1);
+    coordinator = new StreamWriteOperatorCoordinator(conf, context);
     coordinator.start();
+    coordinator.setExecutor(new MockCoordinatorExecutor(context));
 
     String instant = coordinator.getInstant();
     assertNotEquals("", instant);
@@ -180,4 +204,16 @@ public class TestStreamWriteOperatorCoordinator {
     // never throw for hive synchronization now
     assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
   }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void assertError(Runnable runnable, String message) {
+    runnable.run();
+    // wait a little while for the task to finish
+    assertThat(coordinator.getContext(), 
instanceOf(MockOperatorCoordinatorContext.class));
+    MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext) 
coordinator.getContext();
+    assertTrue(context.isJobFailed(), message);
+  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 0e53cfa..2384f7e 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.utils.TestConfigurations;
@@ -53,11 +52,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Test cases for StreamingSinkFunction.
+ * Test cases for stream write.
  */
 public class TestWriteCopyOnWrite {
 
@@ -193,19 +191,19 @@ public class TestWriteCopyOnWrite {
     assertThat(writeStatuses.size(), is(0)); // no data write
 
     // fails the checkpoint
-    assertThrows(HoodieException.class,
-        () -> funcWrapper.checkpointFails(1),
-        "The last checkpoint was aborted, roll back the last write and throw");
+    funcWrapper.checkpointFails(1);
+    assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(),
+        "The last checkpoint was aborted, ignore the events");
 
-    // the instant metadata should be cleared
-    checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.REQUESTED, null);
+    // the instant metadata should be reused
+    checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.REQUESTED, instant);
     checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, null);
 
     for (RowData rowData : TestData.DATA_SET_INSERT) {
       funcWrapper.invoke(rowData);
     }
 
-    // this returns early cause there is no inflight instant
+    // this returns early because there is no inflight instant
     funcWrapper.checkpointFunction(2);
     // do not sent the write event and fails the checkpoint,
     // behaves like the last checkpoint is successful.
@@ -501,16 +499,11 @@ public class TestWriteCopyOnWrite {
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
 
     checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.REQUESTED, instant);
-    assertFalse(funcWrapper.isAllPartitionsLoaded(),
-        "All partitions assume to be loaded into the index state");
+
     funcWrapper.checkpointComplete(2);
     // the coordinator checkpoint commits the inflight instant.
     checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, instant);
     checkWrittenData(tempFile, EXPECTED2);
-    // next element triggers all partitions load check
-    funcWrapper.invoke(TestData.DATA_SET_INSERT.get(0));
-    assertTrue(funcWrapper.isAllPartitionsLoaded(),
-        "All partitions assume to be loaded into the index state");
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 04cee44..33457b5 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -146,6 +146,55 @@ public class TestBucketAssigner {
     assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
   }
 
+  /**
+   * Test that only partial small files are assigned to the task.
+   */
+  @Test
+  public void testInsertWithPartialSmallFiles() {
+    SmallFile f0 = new SmallFile();
+    f0.location = new HoodieRecordLocation("t0", "f0");
+    f0.sizeBytes = 12;
+
+    SmallFile f1 = new SmallFile();
+    f1.location = new HoodieRecordLocation("t0", "f1");
+    f1.sizeBytes = 122879; // no left space to append new records to this 
bucket
+
+    SmallFile f2 = new SmallFile();
+    f2.location = new HoodieRecordLocation("t0", "f2");
+    f2.sizeBytes = 56;
+
+    Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+    smallFilesMap.put("par1", Arrays.asList(f0, f1, f2));
+
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, 
context, writeConfig, smallFilesMap);
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+    MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, 
context, writeConfig, smallFilesMap);
+    BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+    assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+    mockBucketAssigner2.addInsert("par1");
+    bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+    assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+    bucketInfo2 = mockBucketAssigner2.addInsert("par3");
+    assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT);
+
+    bucketInfo2 = mockBucketAssigner2.addInsert("par3");
+    assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT);
+  }
+
   @Test
   public void testUpdateAndInsertWithSmallFiles() {
     SmallFile f0 = new SmallFile();
@@ -187,6 +236,60 @@ public class TestBucketAssigner {
     assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2");
   }
 
+  /**
+   * Test that only partial small files are assigned to the task.
+   */
+  @Test
+  public void testUpdateAndInsertWithPartialSmallFiles() {
+    SmallFile f0 = new SmallFile();
+    f0.location = new HoodieRecordLocation("t0", "f0");
+    f0.sizeBytes = 12;
+
+    SmallFile f1 = new SmallFile();
+    f1.location = new HoodieRecordLocation("t0", "f1");
+    f1.sizeBytes = 122879; // no left space to append new records to this 
bucket
+
+    SmallFile f2 = new SmallFile();
+    f2.location = new HoodieRecordLocation("t0", "f2");
+    f2.sizeBytes = 56;
+
+    Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+    smallFilesMap.put("par1", Arrays.asList(f0, f1, f2));
+
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, 
context, writeConfig, smallFilesMap);
+    mockBucketAssigner.addUpdate("par1", "f0");
+
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addUpdate("par1", "f2");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+
+    MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, 
context, writeConfig, smallFilesMap);
+    mockBucketAssigner2.addUpdate("par1", "f0");
+
+    BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+    assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+    mockBucketAssigner2.addInsert("par1");
+    bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+    assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+    mockBucketAssigner2.addUpdate("par1", "f2");
+
+    mockBucketAssigner2.addInsert("par1");
+    bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+    assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+  }
+
   private void assertBucketEquals(
       BucketInfo bucketInfo,
       String partition,
@@ -220,7 +323,16 @@ public class TestBucketAssigner {
         HoodieFlinkEngineContext context,
         HoodieWriteConfig config,
         Map<String, List<SmallFile>> smallFilesMap) {
-      super(context, config);
+      this(0, 1, context, config, smallFilesMap);
+    }
+
+    MockBucketAssigner(
+        int taskID,
+        int numTasks,
+        HoodieFlinkEngineContext context,
+        HoodieWriteConfig config,
+        Map<String, List<SmallFile>> smallFilesMap) {
+      super(taskID, numTasks, context, config);
       this.smallFilesMap = smallFilesMap;
     }
 
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index e8796ec..fe3dd81 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -28,7 +28,9 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.Output;
@@ -78,6 +80,9 @@ public class CompactFunctionWrapper {
     compactFunction = new CompactFunction(conf);
     compactFunction.setRuntimeContext(runtimeContext);
     compactFunction.open(conf);
+    final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
+        new MockOperatorCoordinatorContext(new OperatorID(), 1));
+    compactFunction.setExecutor(syncExecutor);
 
     commitSink = new CompactionCommitSink(conf);
     commitSink.setRuntimeContext(runtimeContext);
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java
new file mode 100644
index 0000000..099dfd6
--- /dev/null
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mock {@link CoordinatorExecutor} that executes the actions synchronously.
+ */
+public class MockCoordinatorExecutor extends CoordinatorExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MockCoordinatorExecutor.class);
+
+  public MockCoordinatorExecutor(OperatorCoordinator.Context context) {
+    super(context, LOG);
+  }
+
+  @Override
+  public void execute(ThrowingRunnable<Throwable> action, String actionName, 
Object... actionParams) {
+    final String actionString = String.format(actionName, actionParams);
+    try {
+      action.run();
+      LOG.info("Executor executes action [{}] success!", actionString);
+    } catch (Throwable t) {
+      // if we have a JVM critical error, promote it immediately, there is a 
good
+      // chance the
+      // logging or job failing will not succeed any more
+      ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+      exceptionHook(actionString, t);
+    }
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 72f2e89..a4b6c16 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -33,7 +33,9 @@ import org.apache.hudi.utils.TestConfigurations;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
@@ -57,6 +59,7 @@ public class StreamWriteFunctionWrapper<I> {
   private final IOManager ioManager;
   private final StreamingRuntimeContext runtimeContext;
   private final MockOperatorEventGateway gateway;
+  private final MockOperatorCoordinatorContext coordinatorContext;
   private final StreamWriteOperatorCoordinator coordinator;
   private final MockFunctionInitializationContext 
functionInitializationContext;
 
@@ -84,13 +87,15 @@ public class StreamWriteFunctionWrapper<I> {
     this.gateway = new MockOperatorEventGateway();
     this.conf = conf;
     // one function
-    this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
+    this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
     this.functionInitializationContext = new 
MockFunctionInitializationContext();
     this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
   }
 
   public void openFunction() throws Exception {
     this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
     toHoodieFunction = new 
RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
     toHoodieFunction.setRuntimeContext(runtimeContext);
     toHoodieFunction.open(conf);
@@ -181,6 +186,10 @@ public class StreamWriteFunctionWrapper<I> {
     return coordinator;
   }
 
+  public MockOperatorCoordinatorContext getCoordinatorContext() {
+    return coordinatorContext;
+  }
+
   public void clearIndexState() {
     this.bucketAssignerFunction.clearIndexState();
   }
@@ -188,8 +197,4 @@ public class StreamWriteFunctionWrapper<I> {
   public boolean isKeyInState(HoodieKey hoodieKey) {
     return this.bucketAssignerFunction.isKeyInState(hoodieKey);
   }
-
-  public boolean isAllPartitionsLoaded() {
-    return this.bucketAssignerFunction.isAllPartitionsLoaded();
-  }
 }

Reply via email to