This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c5b692  [HUDI-1557] Make Flink write pipeline write task scalable 
(#2506)
4c5b692 is described below

commit 4c5b6923ccfaaa6616a934a3f690b1a795a42d41
Author: Danny Chan <[email protected]>
AuthorDate: Sat Feb 6 22:03:52 2021 +0800

    [HUDI-1557] Make Flink write pipeline write task scalable (#2506)
    
    This is the #step 2 of RFC-24:
    
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal
    
    This PR introduce a BucketAssigner that assigns bucket ID (partition
    path & fileID) for each stream record.
    
    There is no need to look up index and partition the records anymore in
    the following pipeline for these records,
    we actually decide the write target location before the write and each
    record computes its location when the BucketAssigner receives it, thus,
    the indexing is with streaming style.
    
    Computing locations for a batch of records all at a time is resource
    consuming so a pressure to the engine,
    we should avoid that in streaming system.
---
 .../java/org/apache/hudi/table/WorkloadStat.java   |   8 +-
 .../hudi/table/action/commit/BucketInfo.java       |  38 +++
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  15 +-
 .../hudi/index/state/FlinkInMemoryStateIndex.java  |  40 +--
 .../apache/hudi/io/FlinkCreateHandleFactory.java   |  41 +++
 .../commit/BaseFlinkCommitActionExecutor.java      | 199 ++++---------
 .../hudi/table/action/commit/FlinkWriteHelper.java |  46 ++-
 .../table/action/commit/UpsertPartitioner.java     |  10 +-
 .../table/action/commit/UpsertPartitioner.java     |  10 +-
 .../table/action/commit/UpsertPartitioner.java     |  10 +-
 .../table/timeline/HoodieActiveTimeline.java       |   6 +
 .../hudi/operator/InstantGenerateOperator.java     |  18 +-
 .../hudi/operator/KeyedWriteProcessFunction.java   |  50 +++-
 .../apache/hudi/operator/StreamWriteFunction.java  |  86 ++----
 .../apache/hudi/operator/StreamWriteOperator.java  |   5 +-
 .../operator/StreamWriteOperatorCoordinator.java   |  32 +-
 .../hudi/operator/StreamWriteOperatorFactory.java  |   4 +-
 .../operator/partitioner/BucketAssignFunction.java | 149 ++++++++++
 .../hudi/operator/partitioner/BucketAssigner.java  | 326 +++++++++++++++++++++
 .../transform/RowDataToHoodieFunction.java         | 108 +++++++
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   3 +-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  19 +-
 .../hudi/streamer/HoodieFlinkStreamerV2.java       |  28 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  43 +++
 .../hudi/operator/StreamWriteFunctionTest.java     |  65 +++-
 .../apache/hudi/operator/StreamWriteITCase.java    | 119 +++++++-
 .../operator/partitioner/TestBucketAssigner.java   | 235 +++++++++++++++
 .../operator/utils/StreamWriteFunctionWrapper.java |  74 +++--
 .../hudi/operator/utils/TestConfigurations.java    |  13 +
 .../org/apache/hudi/operator/utils/TestData.java   |  28 +-
 30 files changed, 1435 insertions(+), 393 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java
index 6fdb217..c3371ba 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java
@@ -44,7 +44,13 @@ public class WorkloadStat implements Serializable {
   }
 
   public long addUpdates(HoodieRecordLocation location, long numUpdates) {
-    updateLocationToCount.put(location.getFileId(), 
Pair.of(location.getInstantTime(), numUpdates));
+    long accNumUpdates = 0;
+    if (updateLocationToCount.containsKey(location.getFileId())) {
+      accNumUpdates = 
updateLocationToCount.get(location.getFileId()).getRight();
+    }
+    updateLocationToCount.put(
+        location.getFileId(),
+        Pair.of(location.getInstantTime(), numUpdates + accNumUpdates));
     return this.numUpdates += numUpdates;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
index 1d98ad4..6547da6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Helper class for a bucket's type (INSERT and UPDATE) and its file location.
@@ -29,6 +30,24 @@ public class BucketInfo implements Serializable {
   String fileIdPrefix;
   String partitionPath;
 
+  public BucketInfo(BucketType bucketType, String fileIdPrefix, String 
partitionPath) {
+    this.bucketType = bucketType;
+    this.fileIdPrefix = fileIdPrefix;
+    this.partitionPath = partitionPath;
+  }
+
+  public BucketType getBucketType() {
+    return bucketType;
+  }
+
+  public String getFileIdPrefix() {
+    return fileIdPrefix;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("BucketInfo {");
@@ -38,4 +57,23 @@ public class BucketInfo implements Serializable {
     sb.append('}');
     return sb.toString();
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BucketInfo that = (BucketInfo) o;
+    return bucketType == that.bucketType
+        && fileIdPrefix.equals(that.fileIdPrefix)
+        && partitionPath.equals(that.partitionPath);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(bucketType, fileIdPrefix, partitionPath);
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index e3e0eb4..0c87f7d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
@@ -249,7 +250,17 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) context);
     String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, 
commitType, instant));
+    HoodieActiveTimeline activeTimeline = 
table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, 
instant);
+    activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, 
instant);
+  }
+
+  public void transitionRequestedToInflight(String tableType, String 
inFlightInstant) {
+    HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) context);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+    HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, 
commitType, inFlightInstant);
+    activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
+        config.shouldAllowMultiWriteOnSameInstant());
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
index 44eafd5..bae8de2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.FlinkHoodieIndex;
@@ -62,47 +61,14 @@ public class FlinkInMemoryStateIndex<T extends 
HoodieRecordPayload> extends Flin
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws 
HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", 
record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for 
FlinkInMemoryStateIndex");
   }
 
   @Override
   public List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses,
                                           HoodieEngineContext context,
                                           HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws 
HoodieIndexException {
-    return context.map(writeStatuses, writeStatus -> {
-      for (HoodieRecord record : writeStatus.getWrittenRecords()) {
-        if (!writeStatus.isErrored(record.getKey())) {
-          HoodieKey key = record.getKey();
-          Option<HoodieRecordLocation> newLocation = record.getNewLocation();
-          if (newLocation.isPresent()) {
-            try {
-              mapState.put(key, newLocation.get());
-            } catch (Exception e) {
-              LOG.error(String.format("Update record location failed, key = 
%s, %s", record.getRecordKey(), e.getMessage()));
-            }
-          } else {
-            // Delete existing index for a deleted record
-            try {
-              mapState.remove(key);
-            } catch (Exception e) {
-              LOG.error(String.format("Remove record location failed, key = 
%s, %s", record.getRecordKey(), e.getMessage()));
-            }
-          }
-        }
-      }
-      return writeStatus;
-    }, 0);
+    throw new UnsupportedOperationException("No need to update location for 
FlinkInMemoryStateIndex");
   }
 
   @Override
@@ -128,6 +94,6 @@ public class FlinkInMemoryStateIndex<T extends 
HoodieRecordPayload> extends Flin
    */
   @Override
   public boolean isImplicitWithStorage() {
-    return false;
+    return true;
   }
 }
\ No newline at end of file
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
new file mode 100644
index 0000000..d65663e
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Create handle factory for Flink writer, use the specified fileID directly
+ * because it is unique anyway.
+ */
+public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
+    extends CreateHandleFactory<T, I, K, O> {
+
+  @Override
+  public HoodieWriteHandle<T, I, K, O> create(
+      HoodieWriteConfig hoodieConfig, String commitTime,
+      HoodieTable<T, I, K, O> hoodieTable, String partitionPath,
+      String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
+    return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, 
partitionPath,
+        fileIdPrefix, taskContextSupplier);
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 337e7cb..044f841 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -20,51 +20,52 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.FlinkLazyInsertIterable;
-import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.FlinkCreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieSortedMergeHandle;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.WorkloadProfile;
-import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.time.Instant;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
+/**
+ * With {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each 
hoodie record
+ * is tagged with a bucket ID (partition path + fileID) in streaming way. All 
the records consumed by this
+ * executor should be tagged with bucket IDs and belong to one data bucket.
+ *
+ * <p>These bucket IDs make it possible to shuffle the records first by the 
bucket ID
+ * (see org.apache.hudi.operator.partitioner.BucketAssignerFunction), and this 
executor
+ * only needs to handle the data buffer that belongs to one data bucket once 
at a time. So there is no need to
+ * partition the buffer.
+ *
+ * <p>Computing the records batch locations all at a time is a pressure to the 
engine,
+ * we should avoid that in streaming system.
+ */
 public abstract class BaseFlinkCommitActionExecutor<T extends 
HoodieRecordPayload> extends
     BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>, HoodieWriteMetadata> {
 
@@ -91,47 +92,39 @@ public abstract class BaseFlinkCommitActionExecutor<T 
extends HoodieRecordPayloa
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), 
instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), 
inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime 
+ " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + 
" unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = 
partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
-    updateIndex(writeStatuses, result);
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = 
record.getCurrentLocation().getInstantTime().equals("I")
+        ? BucketType.INSERT
+        : BucketType.UPDATE;
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId, bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    } else {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId,
+          bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    }
+    setUpWriteMetadata(writeStatuses, result);
     return result;
   }
 
-  protected void updateIndex(List<WriteStatus> writeStatuses, 
HoodieWriteMetadata<List<WriteStatus>> result) {
-    Instant indexStartTime = Instant.now();
-    // Update the index back
-    List<WriteStatus> statuses = 
table.getIndex().updateLocation(writeStatuses, context, table);
-    result.setIndexUpdateDuration(Duration.between(indexStartTime, 
Instant.now()));
+  protected void setUpWriteMetadata(
+      List<WriteStatus> statuses,
+      HoodieWriteMetadata<List<WriteStatus>> result) {
+    // No need to update the index because the update happens before the write.
     result.setWriteStatuses(statuses);
+    result.setIndexUpdateDuration(Duration.ZERO);
   }
 
   @Override
@@ -139,56 +132,6 @@ public abstract class BaseFlinkCommitActionExecutor<T 
extends HoodieRecordPayloa
     return table.getMetaClient().getCommitActionType();
   }
 
-  private Partitioner getPartitioner(WorkloadProfile profile) {
-    if (WriteOperationType.isChangingRecords(operationType)) {
-      return getUpsertPartitioner(profile);
-    } else {
-      return getInsertPartitioner(profile);
-    }
-  }
-
-  private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>> 
dedupedRecords, Partitioner partitioner) {
-    Map<Integer, List<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, 
HoodieRecord<T>>>> partitionedMidRecords = dedupedRecords
-        .stream()
-        .map(record -> new Tuple2<>(new Tuple2<>(record.getKey(), 
Option.ofNullable(record.getCurrentLocation())), record))
-        .collect(Collectors.groupingBy(x -> partitioner.getPartition(x._1)));
-    Map<Integer, List<HoodieRecord<T>>> results = new LinkedHashMap<>();
-    partitionedMidRecords.forEach((key, value) -> results.put(key, 
value.stream().map(x -> x._2).collect(Collectors.toList())));
-    return results;
-  }
-
-  protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> 
buildProfile(List<HoodieRecord<T>> inputRecords) {
-    HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
-    WorkloadStat globalStat = new WorkloadStat();
-
-    Map<Pair<String, Option<HoodieRecordLocation>>, Long> 
partitionLocationCounts = inputRecords
-        .stream()
-        .map(record -> Pair.of(
-            Pair.of(record.getPartitionPath(), 
Option.ofNullable(record.getCurrentLocation())), record))
-        .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
-
-    for (Map.Entry<Pair<String, Option<HoodieRecordLocation>>, Long> e : 
partitionLocationCounts.entrySet()) {
-      String partitionPath = e.getKey().getLeft();
-      Long count = e.getValue();
-      Option<HoodieRecordLocation> locOption = e.getKey().getRight();
-
-      if (!partitionPathStatMap.containsKey(partitionPath)) {
-        partitionPathStatMap.put(partitionPath, new WorkloadStat());
-      }
-
-      if (locOption.isPresent()) {
-        // update
-        partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), 
count);
-        globalStat.addUpdates(locOption.get(), count);
-      } else {
-        // insert
-        partitionPathStatMap.get(partitionPath).addInserts(count);
-        globalStat.addInserts(count);
-      }
-    }
-    return Pair.of(partitionPathStatMap, globalStat);
-  }
-
   @Override
   protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<List<WriteStatus>> result) {
     commit(extraMetadata, result, 
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
@@ -228,31 +171,28 @@ public abstract class BaseFlinkCommitActionExecutor<T 
extends HoodieRecordPayloa
   }
 
   @SuppressWarnings("unchecked")
-  protected Iterator<List<WriteStatus>> handleUpsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner 
partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(
+      String instantTime,
+      String partitionPath,
+      String fileIdHint,
+      BucketType bucketType,
+      Iterator recordItr) {
     try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, 
recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for 
partition :" + partition);
+      switch (bucketType) {
+        case INSERT:
+          return handleInsert(fileIdHint, recordItr);
+        case UPDATE:
+          return handleUpdate(partitionPath, fileIdHint, recordItr);
+        default:
+          throw new HoodieUpsertException("Unknown bucketType " + bucketType + 
" for partition :" + partitionPath);
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" 
+ partition;
+      String msg = "Error upserting bucketType " + bucketType + " for 
partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
 
-  protected Iterator<List<WriteStatus>> handleInsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner 
partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
-  }
-
   @Override
   public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String 
fileId,
                                                   Iterator<HoodieRecord<T>> 
recordItr)
@@ -293,13 +233,6 @@ public abstract class BaseFlinkCommitActionExecutor<T 
extends HoodieRecordPayloa
     }
   }
 
-  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String 
fileId,
-                                              Map<String, HoodieRecord<T>> 
keyToNewRecords,
-                                              HoodieBaseFile 
dataFileToBeMerged) {
-    return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords,
-        partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
-  }
-
   @Override
   public Iterator<List<WriteStatus>> handleInsert(String idPfx, 
Iterator<HoodieRecord<T>> recordItr)
       throws Exception {
@@ -309,24 +242,6 @@ public abstract class BaseFlinkCommitActionExecutor<T 
extends HoodieRecordPayloa
       return Collections.singletonList((List<WriteStatus>) 
Collections.EMPTY_LIST).iterator();
     }
     return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, 
table, idPfx,
-        taskContextSupplier, new CreateHandleFactory<>());
+        taskContextSupplier, new FlinkCreateHandleFactory<>());
   }
-
-  /**
-   * Provides a partitioner to perform the upsert operation, based on the 
workload profile.
-   */
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
-    if (profile == null) {
-      throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
-    }
-    return new UpsertPartitioner(profile, context, table, config);
-  }
-
-  /**
-   * Provides a partitioner to perform the insert operation, based on the 
workload profile.
-   */
-  public Partitioner getInsertPartitioner(WorkloadProfile profile) {
-    return getUpsertPartitioner(profile);
-  }
-
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 191071e..5238123 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -19,17 +19,32 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+/**
+ * Overrides the {@link #write} method to not look up index and partition the 
records, because
+ * with {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each 
hoodie record
+ * is tagged with a bucket ID (partition path + fileID) in streaming way. The 
FlinkWriteHelper only hands over
+ * the records to the action executor {@link BaseCommitActionExecutor} to 
execute.
+ *
+ * <p>Computing the records batch locations all at a time is a pressure to the 
engine,
+ * we should avoid that in streaming system.
+ */
 public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends 
AbstractWriteHelper<T, List<HoodieRecord<T>>,
     List<HoodieKey>, List<WriteStatus>, R> {
 
@@ -45,22 +60,45 @@ public class FlinkWriteHelper<T extends 
HoodieRecordPayload,R> extends AbstractW
   }
 
   @Override
+  public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, 
List<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
+                                                      HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean 
shouldCombine, int shuffleParallelism,
+                                                      
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>, R> executor, boolean performTagging) {
+    try {
+      Instant lookupBegin = Instant.now();
+      Duration indexLookupDuration = Duration.between(lookupBegin, 
Instant.now());
+
+      HoodieWriteMetadata<List<WriteStatus>> result = 
executor.execute(inputRecords);
+      result.setIndexLookupDuration(indexLookupDuration);
+      return result;
+    } catch (Throwable e) {
+      if (e instanceof HoodieUpsertException) {
+        throw (HoodieUpsertException) e;
+      }
+      throw new HoodieUpsertException("Failed to upsert for commit time " + 
instantTime, e);
+    }
+  }
+
+  @Override
   public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> 
records,
                                                      HoodieIndex<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
                                                      int parallelism) {
-    boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = 
records.stream().map(record -> {
-      HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their 
partitionPath
-      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
+      final Object key = record.getKey().getRecordKey();
       return Pair.of(key, record);
     }).collect(Collectors.groupingBy(Pair::getLeft));
 
     return keyedRecords.values().stream().map(x -> 
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
       T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+      // we cannot allow the user to change the key or partitionPath, since 
that will affect
+      // everything
+      // so pick it from one of the records.
       HoodieKey reducedKey = rec1.getData().equals(reducedData) ? 
rec1.getKey() : rec2.getKey();
-      return new HoodieRecord<T>(reducedKey, reducedData);
+      HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, 
reducedData);
+      // reuse the location from the first record.
+      hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
+      return hoodieRecord;
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 8cc9b0d..f44e83d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -116,10 +116,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> implements Part
   private int addUpdateBucket(String partitionPath, String fileIdHint) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
-    BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
-    bucketInfo.fileIdPrefix = fileIdHint;
-    bucketInfo.partitionPath = partitionPath;
+    BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, 
partitionPath);
     bucketInfoMap.put(totalBuckets, bucketInfo);
     totalBuckets++;
     return bucket;
@@ -186,10 +183,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> implements Part
             } else {
               recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 
1) * insertRecordsPerBucket);
             }
-            BucketInfo bucketInfo = new BucketInfo();
-            bucketInfo.bucketType = BucketType.INSERT;
-            bucketInfo.partitionPath = partitionPath;
-            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, 
FSUtils.createNewFileIdPfx(), partitionPath);
             bucketInfoMap.put(totalBuckets, bucketInfo);
             totalBuckets++;
           }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 4f19203..eeeeacf 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -114,10 +114,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> implements Part
   private int addUpdateBucket(String partitionPath, String fileIdHint) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
-    BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
-    bucketInfo.fileIdPrefix = fileIdHint;
-    bucketInfo.partitionPath = partitionPath;
+    BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, 
partitionPath);
     bucketInfoMap.put(totalBuckets, bucketInfo);
     totalBuckets++;
     return bucket;
@@ -184,10 +181,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> implements Part
             } else {
               recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 
1) * insertRecordsPerBucket);
             }
-            BucketInfo bucketInfo = new BucketInfo();
-            bucketInfo.bucketType = BucketType.INSERT;
-            bucketInfo.partitionPath = partitionPath;
-            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, 
FSUtils.createNewFileIdPfx(), partitionPath);
             bucketInfoMap.put(totalBuckets, bucketInfo);
             totalBuckets++;
           }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index ee153c8..9d60cde 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -120,10 +120,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends Partiti
   private int addUpdateBucket(String partitionPath, String fileIdHint) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
-    BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
-    bucketInfo.fileIdPrefix = fileIdHint;
-    bucketInfo.partitionPath = partitionPath;
+    BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, 
partitionPath);
     bucketInfoMap.put(totalBuckets, bucketInfo);
     totalBuckets++;
     return bucket;
@@ -223,10 +220,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends Partiti
             } else {
               recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 
1) * insertRecordsPerBucket);
             }
-            BucketInfo bucketInfo = new BucketInfo();
-            bucketInfo.bucketType = BucketType.INSERT;
-            bucketInfo.partitionPath = partitionPath;
-            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, 
FSUtils.createNewFileIdPfx(), partitionPath);
             bucketInfoMap.put(totalBuckets, bucketInfo);
             totalBuckets++;
           }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index fcb4fd9..865f0dc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -167,6 +167,12 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
+  public void deletePending(HoodieInstant.State state, String action, String 
instantStr) {
+    HoodieInstant instant = new HoodieInstant(state, action, instantStr);
+    ValidationUtils.checkArgument(!instant.isCompleted());
+    deleteInstantFile(instant);
+  }
+
   public void deleteCompactionRequested(HoodieInstant instant) {
     ValidationUtils.checkArgument(instant.isRequested());
     ValidationUtils.checkArgument(Objects.equals(instant.getAction(), 
HoodieTimeline.COMPACTION_ACTION));
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
index 5c9930d..dea8a05 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
@@ -115,7 +114,7 @@ public class InstantGenerateOperator extends 
AbstractStreamOperator<HoodieRecord
       writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
 
       // init table, create it if not exists.
-      initTable();
+      StreamerUtil.initTableIfNotExists(FlinkOptions.fromStreamerConfig(cfg));
 
       // create instant marker directory
       createInstantMarkerDir();
@@ -189,6 +188,7 @@ public class InstantGenerateOperator extends 
AbstractStreamOperator<HoodieRecord
    */
   private String startNewInstant(long checkpointId) {
     String newTime = writeClient.startCommit();
+    this.writeClient.transitionRequestedToInflight(this.cfg.tableType, 
newTime);
     LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
     return newTime;
   }
@@ -218,20 +218,6 @@ public class InstantGenerateOperator extends 
AbstractStreamOperator<HoodieRecord
     throw new InterruptedException(String.format("Last instant costs more than 
%s second, stop task now", retryTimes * retryInterval));
   }
 
-
-  /**
-   * Create table if not exists.
-   */
-  private void initTable() throws IOException {
-    if (!fs.exists(new Path(cfg.targetBasePath))) {
-      HoodieTableMetaClient.initTableType(new 
Configuration(serializableHadoopConf.get()), cfg.targetBasePath,
-          HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, 
"archived", cfg.payloadClassName, 1);
-      LOG.info("Table initialized");
-    } else {
-      LOG.info("Table already [{}/{}] exists, do nothing here", 
cfg.targetBasePath, cfg.targetTableName);
-    }
-  }
-
   @Override
   public void close() throws Exception {
     if (writeClient != null) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
index a59a995..4309bb0 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -40,8 +42,10 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A {@link KeyedProcessFunction} where the write operations really happens.
@@ -52,7 +56,7 @@ public class KeyedWriteProcessFunction extends 
KeyedProcessFunction<String, Hood
   /**
    * Records buffer, will be processed in snapshotState function.
    */
-  private List<HoodieRecord> bufferedRecords = new LinkedList<>();
+  private Map<String, List<HoodieRecord>> bufferedRecords;
 
   /**
    * Flink collector help s to send data downstream.
@@ -88,6 +92,8 @@ public class KeyedWriteProcessFunction extends 
KeyedProcessFunction<String, Hood
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
 
+    this.bufferedRecords = new LinkedHashMap<>();
+
     indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
 
     cfg = (FlinkStreamerConfig) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
@@ -112,17 +118,24 @@ public class KeyedWriteProcessFunction extends 
KeyedProcessFunction<String, Hood
         String instantTimestamp = latestInstant;
         LOG.info("Write records, subtask id = [{}]  checkpoint_id = [{}}] 
instant = [{}], record size = [{}]", indexOfThisSubtask, 
context.getCheckpointId(), instantTimestamp, bufferedRecords.size());
 
-        List<WriteStatus> writeStatus;
-        switch (cfg.operation) {
-          case INSERT:
-            writeStatus = writeClient.insert(bufferedRecords, 
instantTimestamp);
-            break;
-          case UPSERT:
-            writeStatus = writeClient.upsert(bufferedRecords, 
instantTimestamp);
-            break;
-          default:
-            throw new HoodieFlinkStreamerException("Unknown operation : " + 
cfg.operation);
-        }
+        final List<WriteStatus> writeStatus = new ArrayList<>();
+        this.bufferedRecords.values().forEach(records -> {
+          if (records.size() > 0) {
+            if (cfg.filterDupes) {
+              records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, 
-1);
+            }
+            switch (cfg.operation) {
+              case INSERT:
+                writeStatus.addAll(writeClient.insert(records, 
instantTimestamp));
+                break;
+              case UPSERT:
+                writeStatus.addAll(writeClient.upsert(records, 
instantTimestamp));
+                break;
+              default:
+                throw new HoodieFlinkStreamerException("Unknown operation : " 
+ cfg.operation);
+            }
+          }
+        });
         output.collect(new Tuple3<>(instantTimestamp, writeStatus, 
indexOfThisSubtask));
         bufferedRecords.clear();
       }
@@ -144,7 +157,7 @@ public class KeyedWriteProcessFunction extends 
KeyedProcessFunction<String, Hood
     }
 
     // buffer the records
-    bufferedRecords.add(hoodieRecord);
+    putDataIntoBuffer(hoodieRecord);
   }
 
   public boolean hasRecordsIn() {
@@ -155,6 +168,15 @@ public class KeyedWriteProcessFunction extends 
KeyedProcessFunction<String, Hood
     return latestInstant;
   }
 
+  private void putDataIntoBuffer(HoodieRecord<?> record) {
+    final String fileId = record.getCurrentLocation().getFileId();
+    final String key = 
StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
+    if (!this.bufferedRecords.containsKey(key)) {
+      this.bufferedRecords.put(key, new ArrayList<>());
+    }
+    this.bufferedRecords.get(key).add(record);
+  }
+
   @Override
   public void close() {
     if (writeClient != null) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
index 34a61d4..5877098 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
@@ -18,22 +18,18 @@
 
 package org.apache.hudi.operator;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
-import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -41,7 +37,6 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -50,7 +45,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
@@ -96,7 +93,7 @@ public class StreamWriteFunction<K, I, O> extends 
KeyedProcessFunction<K, I, O>
   /**
    * Write buffer for a checkpoint.
    */
-  private transient List<HoodieRecord> buffer;
+  private transient Map<String, List<HoodieRecord>> buffer;
 
   /**
    * The buffer lock to control data buffering/flushing.
@@ -131,23 +128,6 @@ public class StreamWriteFunction<K, I, O> extends 
KeyedProcessFunction<K, I, O>
   private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> 
writeFunction;
 
   /**
-   * HoodieKey generator.
-   */
-  private transient KeyGenerator keyGenerator;
-
-  /**
-   * Row type of the input.
-   */
-  private final RowType rowType;
-
-  /**
-   * Avro schema of the input.
-   */
-  private final Schema avroSchema;
-
-  private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
-
-  /**
    * The REQUESTED instant we write the data.
    */
   private volatile String currentInstant;
@@ -160,20 +140,15 @@ public class StreamWriteFunction<K, I, O> extends 
KeyedProcessFunction<K, I, O>
   /**
    * Constructs a StreamingSinkFunction.
    *
-   * @param rowType The input row type
    * @param config  The config options
    */
-  public StreamWriteFunction(RowType rowType, Configuration config) {
-    this.rowType = rowType;
-    this.avroSchema = StreamerUtil.getSourceSchema(config);
+  public StreamWriteFunction(Configuration config) {
     this.config = config;
   }
 
   @Override
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
-    this.keyGenerator = 
StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
-    this.converter = RowDataToAvroConverters.createConverter(this.rowType);
     initBuffer();
     initWriteClient();
     initWriteFunction();
@@ -211,7 +186,7 @@ public class StreamWriteFunction<K, I, O> extends 
KeyedProcessFunction<K, I, O>
       if (onCheckpointing) {
         addToBufferCondition.await();
       }
-      this.buffer.add(toHoodieRecord(value));
+      putDataIntoBuffer(value);
     } finally {
       bufferLock.unlock();
     }
@@ -230,7 +205,7 @@ public class StreamWriteFunction<K, I, O> extends 
KeyedProcessFunction<K, I, O>
 
   @VisibleForTesting
   @SuppressWarnings("rawtypes")
-  public List<HoodieRecord> getBuffer() {
+  public Map<String, List<HoodieRecord>> getBuffer() {
     return buffer;
   }
 
@@ -249,7 +224,7 @@ public class StreamWriteFunction<K, I, O> extends 
KeyedProcessFunction<K, I, O>
   // -------------------------------------------------------------------------
 
   private void initBuffer() {
-    this.buffer = new ArrayList<>();
+    this.buffer = new LinkedHashMap<>();
     this.bufferLock = new ReentrantLock();
     this.addToBufferCondition = this.bufferLock.newCondition();
   }
@@ -277,32 +252,33 @@ public class StreamWriteFunction<K, I, O> extends 
KeyedProcessFunction<K, I, O>
     }
   }
 
-  /**
-   * Converts the give record to a {@link HoodieRecord}.
-   *
-   * @param record The input record
-   * @return HoodieRecord based on the configuration
-   * @throws IOException if error occurs
-   */
-  @SuppressWarnings("rawtypes")
-  private HoodieRecord toHoodieRecord(I record) throws IOException {
-    boolean shouldCombine = 
this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
-        || 
WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == 
WriteOperationType.UPSERT;
-    GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, 
record);
-    final String payloadClazz = 
this.config.getString(FlinkOptions.PAYLOAD_CLASS);
-    Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
-        this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
-    HoodieRecordPayload payload = shouldCombine
-        ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
-        : StreamerUtil.createPayload(payloadClazz, gr);
-    return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+  private void putDataIntoBuffer(I value) {
+    HoodieRecord<?> record = (HoodieRecord<?>) value;
+    final String fileId = record.getCurrentLocation().getFileId();
+    final String key = 
StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
+    if (!this.buffer.containsKey(key)) {
+      this.buffer.put(key, new ArrayList<>());
+    }
+    this.buffer.get(key).add(record);
   }
 
+  @SuppressWarnings("unchecked, rawtypes")
   private void flushBuffer() {
     final List<WriteStatus> writeStatus;
     if (buffer.size() > 0) {
-      writeStatus = writeFunction.apply(buffer, currentInstant);
-      buffer.clear();
+      writeStatus = new ArrayList<>();
+      this.buffer.values()
+          // The records are partitioned by the bucket ID and each batch sent 
to
+          // the writer belongs to one bucket.
+          .forEach(records -> {
+            if (records.size() > 0) {
+              if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
+                records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, 
-1);
+              }
+              writeStatus.addAll(writeFunction.apply(records, currentInstant));
+            }
+          });
+      this.buffer.clear();
     } else {
       LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, 
currentInstant);
       writeStatus = Collections.emptyList();
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
index 3f4d940..247269c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.table.types.logical.RowType;
 
 /**
  * Operator for {@link StreamSink}.
@@ -36,8 +35,8 @@ public class StreamWriteOperator<I>
     implements OperatorEventHandler {
   private final StreamWriteFunction<Object, I, Object> sinkFunction;
 
-  public StreamWriteOperator(RowType rowType, Configuration conf) {
-    super(new StreamWriteFunction<>(rowType, conf));
+  public StreamWriteOperator(Configuration conf) {
+    super(new StreamWriteFunction<>(conf));
     this.sinkFunction = (StreamWriteFunction<Object, I, Object>) 
getUserFunction();
   }
 
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
index 524c601..bf0cfc2 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
@@ -22,9 +22,6 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
@@ -38,8 +35,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +54,8 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
+
 /**
  * {@link OperatorCoordinator} for {@link StreamWriteFunction}.
  *
@@ -121,7 +118,7 @@ public class StreamWriteOperatorCoordinator
     // writeClient
     initWriteClient();
     // init table, create it if not exists.
-    initTable();
+    initTableIfNotExists(this.conf);
   }
 
   @Override
@@ -139,6 +136,7 @@ public class StreamWriteOperatorCoordinator
           + " data has not finish writing, roll back the last write and throw";
       checkAndForceCommit(errMsg);
       this.inFlightInstant = this.writeClient.startCommit();
+      
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE),
 this.inFlightInstant);
       this.inFlightCheckpoint = checkpointId;
       LOG.info("Create instant [{}], at checkpoint [{}]", 
this.inFlightInstant, checkpointId);
       result.complete(writeCheckpointBytes());
@@ -200,28 +198,6 @@ public class StreamWriteOperatorCoordinator
         true);
   }
 
-  private void initTable() throws IOException {
-    final String basePath = this.conf.getString(FlinkOptions.PATH);
-    final org.apache.hadoop.conf.Configuration hadoopConf = 
StreamerUtil.getHadoopConf();
-    // Hadoop FileSystem
-    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
-      if (!fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))) {
-        HoodieTableMetaClient.initTableType(
-            hadoopConf,
-            basePath,
-            
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)),
-            this.conf.getString(FlinkOptions.TABLE_NAME),
-            "archived",
-            this.conf.getString(FlinkOptions.PAYLOAD_CLASS),
-            1);
-        LOG.info("Table initialized");
-      } else {
-        LOG.info("Table [{}/{}] already exists, no need to initialize the 
table",
-            basePath, this.conf.getString(FlinkOptions.TABLE_NAME));
-      }
-    }
-  }
-
   static byte[] readBytes(DataInputStream in, int size) throws IOException {
     byte[] bytes = new byte[size];
     in.readFully(bytes);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
index f5faa54..5626745 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.table.types.logical.RowType;
 
 /**
  * Factory class for {@link StreamWriteOperator}.
@@ -43,10 +42,9 @@ public class StreamWriteOperatorFactory<I>
   private final int numTasks;
 
   public StreamWriteOperatorFactory(
-      RowType rowType,
       Configuration conf,
       int numTasks) {
-    super(new StreamWriteOperator<>(rowType, conf));
+    super(new StreamWriteOperator<>(conf));
     this.operator = (StreamWriteOperator<I>) getOperator();
     this.conf = conf;
     this.numTasks = numTasks;
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
new file mode 100644
index 0000000..269ccc8
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The function to build the write profile incrementally for records within a 
checkpoint,
+ * it then assigns the bucket with ID using the {@link BucketAssigner}.
+ *
+ * <p>All the records are tagged with HoodieRecordLocation, instead of real 
instant time,
+ * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is 
no need to keep
+ * the "real" instant time for each record, the bucket ID (partition path & 
fileID) actually decides
+ * where the record should write to. The "I" and "U" tag is only used for 
downstream to decide whether
+ * the data bucket is a INSERT or a UPSERT, we should factor the it out when 
the underneath writer
+ * supports specifying the bucket type explicitly.
+ *
+ * <p>The output records should then shuffle by the bucket ID and thus do 
scalable write.
+ *
+ * @see BucketAssigner
+ */
+public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
+    extends KeyedProcessFunction<K, I, O>
+    implements CheckpointedFunction, CheckpointListener {
+
+  private MapState<HoodieKey, HoodieRecordLocation> indexState;
+
+  private BucketAssigner bucketAssigner;
+
+  private final Configuration conf;
+
+  private final boolean isChangingRecords;
+
+  public BucketAssignFunction(Configuration conf) {
+    this.conf = conf;
+    this.isChangingRecords = WriteOperationType.isChangingRecords(
+        WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    HoodieWriteConfig writeConfig = 
StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkEngineContext context =
+        new HoodieFlinkEngineContext(
+            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+            new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(
+        context,
+        writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) {
+    this.bucketAssigner.reset();
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
+        new MapStateDescriptor<>(
+            "indexState",
+            TypeInformation.of(HoodieKey.class),
+            TypeInformation.of(HoodieRecordLocation.class));
+    indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void processElement(I value, Context ctx, Collector<O> out) throws 
Exception {
+    // 1. put the record into the BucketAssigner;
+    // 2. look up the state for location, if the record has a location, just 
send it out;
+    // 3. if it is an INSERT, decide the location using the BucketAssigner 
then send it out.
+    HoodieRecord<?> record = (HoodieRecord<?>) value;
+    final HoodieKey hoodieKey = record.getKey();
+    final BucketInfo bucketInfo;
+    final HoodieRecordLocation location;
+    // Only changing records need looking up the index for the location,
+    // append only records are always recognized as INSERT.
+    if (isChangingRecords && this.indexState.contains(hoodieKey)) {
+      // Set up the instant time as "U" to mark the bucket as an update bucket.
+      location = new HoodieRecordLocation("U", 
this.indexState.get(hoodieKey).getFileId());
+      this.bucketAssigner.addUpdate(record.getPartitionPath(), 
location.getFileId());
+    } else {
+      bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath());
+      switch (bucketInfo.getBucketType()) {
+        case INSERT:
+          // This is an insert bucket, use HoodieRecordLocation instant time 
as "I".
+          // Downstream operators can then check the instant time to know 
whether
+          // a record belongs to an insert bucket.
+          location = new HoodieRecordLocation("I", 
bucketInfo.getFileIdPrefix());
+          break;
+        case UPDATE:
+          location = new HoodieRecordLocation("U", 
bucketInfo.getFileIdPrefix());
+          break;
+        default:
+          throw new AssertionError();
+      }
+      this.indexState.put(hoodieKey, location);
+    }
+    record.unseal();
+    record.setCurrentLocation(location);
+    record.seal();
+    out.collect((O) record);
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long l) {
+    // Refresh the table state when there are new commits.
+    this.bucketAssigner.refreshTable();
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java
new file mode 100644
index 0000000..f87a802
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Bucket assigner that assigns the data buffer of one checkpoint into buckets.
+ *
+ * <p>This assigner assigns the record one by one.
+ * If the record is an update, checks and reuse existing UPDATE bucket or 
generates a new one;
+ * If the record is an insert, checks the record partition for small files 
first, try to find a small file
+ * that has space to append new records and reuse the small file's data 
bucket, if
+ * there is no small file(or no left space for new records), generates an 
INSERT bucket.
+ *
+ * <p>Use {partition}_{fileId} as the bucket identifier, so that the bucket is 
unique
+ * within and among partitions.
+ */
+public class BucketAssigner {
+  private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
+
+  /**
+   * Remembers what type each bucket is for later.
+   */
+  private final HashMap<String, BucketInfo> bucketInfoMap;
+
+  private HoodieTable<?, ?, ?, ?> table;
+
+  /**
+   * Fink engine context.
+   */
+  private final HoodieFlinkEngineContext context;
+
+  /**
+   * The write config.
+   */
+  private final HoodieWriteConfig config;
+
+  /**
+   * The average record size.
+   */
+  private final long averageRecordSize;
+
+  /**
+   * Total records to write for each bucket based on
+   * the config option {@link 
org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}.
+   */
+  private final long insertRecordsPerBucket;
+
+  /**
+   * Partition path to small files mapping.
+   */
+  private final Map<String, List<SmallFile>> partitionSmallFilesMap;
+
+  /**
+   * Bucket ID(partition + fileId) -> small file assign state.
+   */
+  private final Map<String, SmallFileAssignState> smallFileAssignStates;
+
+  /**
+   * Bucket ID(partition + fileId) -> new file assign state.
+   */
+  private final Map<String, NewFileAssignState> newFileAssignStates;
+
+  public BucketAssigner(
+      HoodieFlinkEngineContext context,
+      HoodieWriteConfig config) {
+    bucketInfoMap = new HashMap<>();
+    partitionSmallFilesMap = new HashMap<>();
+    smallFileAssignStates = new HashMap<>();
+    newFileAssignStates = new HashMap<>();
+    this.context = context;
+    this.config = config;
+    this.table = HoodieFlinkTable.create(this.config, this.context);
+    averageRecordSize = averageBytesPerRecord(
+        
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        config);
+    LOG.info("AvgRecordSize => " + averageRecordSize);
+    insertRecordsPerBucket = config.shouldAutoTuneInsertSplits()
+        ? config.getParquetMaxFileSize() / averageRecordSize
+        : config.getCopyOnWriteInsertSplitSize();
+    LOG.info("InsertRecordsPerBucket => " + insertRecordsPerBucket);
+  }
+
+  /**
+   * Reset the states of this assigner, should do once for each checkpoint,
+   * all the states are accumulated within one checkpoint interval.
+   */
+  public void reset() {
+    bucketInfoMap.clear();
+    partitionSmallFilesMap.clear();
+    smallFileAssignStates.clear();
+    newFileAssignStates.clear();
+  }
+
+  public BucketInfo addUpdate(String partitionPath, String fileIdHint) {
+    final String key = StreamerUtil.generateBucketKey(partitionPath, 
fileIdHint);
+    if (!bucketInfoMap.containsKey(key)) {
+      BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, 
partitionPath);
+      bucketInfoMap.put(key, bucketInfo);
+    }
+    // else do nothing because the bucket already exists.
+    return bucketInfoMap.get(key);
+  }
+
+  public BucketInfo addInsert(String partitionPath) {
+    // for new inserts, compute buckets depending on how many records we have 
for each partition
+    List<SmallFile> smallFiles = getSmallFilesForPartition(partitionPath);
+
+    // first try packing this into one of the smallFiles
+    for (SmallFile smallFile : smallFiles) {
+      final String key = StreamerUtil.generateBucketKey(partitionPath, 
smallFile.location.getFileId());
+      SmallFileAssignState assignState = smallFileAssignStates.get(key);
+      assert assignState != null;
+      if (assignState.canAssign()) {
+        assignState.assign();
+        // create a new bucket or re-use an existing bucket
+        BucketInfo bucketInfo;
+        if (bucketInfoMap.containsKey(key)) {
+          // Assigns an inserts to existing update bucket
+          bucketInfo = bucketInfoMap.get(key);
+        } else {
+          bucketInfo = addUpdate(partitionPath, 
smallFile.location.getFileId());
+        }
+        return bucketInfo;
+      }
+    }
+
+    // if we have anything more, create new insert buckets, like normal
+    if (newFileAssignStates.containsKey(partitionPath)) {
+      NewFileAssignState newFileAssignState = 
newFileAssignStates.get(partitionPath);
+      if (newFileAssignState.canAssign()) {
+        newFileAssignState.assign();
+      }
+      final String key = StreamerUtil.generateBucketKey(partitionPath, 
newFileAssignState.fileId);
+      return bucketInfoMap.get(key);
+    }
+    BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, 
FSUtils.createNewFileIdPfx(), partitionPath);
+    final String key = StreamerUtil.generateBucketKey(partitionPath, 
bucketInfo.getFileIdPrefix());
+    bucketInfoMap.put(key, bucketInfo);
+    newFileAssignStates.put(partitionPath, new 
NewFileAssignState(bucketInfo.getFileIdPrefix(), insertRecordsPerBucket));
+    return bucketInfo;
+  }
+
+  private List<SmallFile> getSmallFilesForPartition(String partitionPath) {
+    if (partitionSmallFilesMap.containsKey(partitionPath)) {
+      return partitionSmallFilesMap.get(partitionPath);
+    }
+    List<SmallFile> smallFiles = getSmallFiles(partitionPath);
+    if (smallFiles.size() > 0) {
+      LOG.info("For partitionPath : " + partitionPath + " Small Files => " + 
smallFiles);
+      partitionSmallFilesMap.put(partitionPath, smallFiles);
+      smallFiles.forEach(smallFile ->
+          smallFileAssignStates.put(
+              StreamerUtil.generateBucketKey(partitionPath, 
smallFile.location.getFileId()),
+              new SmallFileAssignState(config.getParquetMaxFileSize(), 
smallFile, averageRecordSize)));
+      return smallFiles;
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Refresh the table state like TableFileSystemView and HoodieTimeline.
+   */
+  public void refreshTable() {
+    this.table = HoodieFlinkTable.create(this.config, this.context);
+  }
+
+  /**
+   * Returns a list of small files in the given partition path.
+   */
+  protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+    // smallFiles only for partitionPath
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    HoodieTimeline commitTimeline = 
table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+
+    if (!commitTimeline.empty()) { // if we have some commits
+      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+      List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
+          .getLatestBaseFilesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp()).collect(Collectors.toList());
+
+      for (HoodieBaseFile file : allFiles) {
+        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
+          String filename = file.getFileName();
+          SmallFile sf = new SmallFile();
+          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+          sf.sizeBytes = file.getFileSize();
+          smallFileLocations.add(sf);
+        }
+      }
+    }
+
+    return smallFileLocations;
+  }
+
+  /**
+   * Obtains the average record size based on records written during previous 
commits. Used for estimating how many
+   * records pack into one file.
+   */
+  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, 
HoodieWriteConfig hoodieWriteConfig) {
+    long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+    long fileSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
+    try {
+      if (!commitTimeline.empty()) {
+        // Go over the reverse ordered commits to get a more recent estimate 
of average record size.
+        Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
+        while (instants.hasNext()) {
+          HoodieInstant instant = instants.next();
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(commitTimeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+          long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+          long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+          if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 
0) {
+            avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / 
totalRecordsWritten);
+            break;
+          }
+        }
+      }
+    } catch (Throwable t) {
+      // make this fail safe.
+      LOG.error("Error trying to compute average bytes/record ", t);
+    }
+    return avgSize;
+  }
+
+  /**
+   * Candidate bucket state for small file. It records the total number of 
records
+   * that the bucket can append and the current number of assigned records.
+   */
+  private static class SmallFileAssignState {
+    long assigned;
+    long totalUnassigned;
+
+    SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long 
averageRecordSize) {
+      this.assigned = 0;
+      this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / 
averageRecordSize;
+    }
+
+    public boolean canAssign() {
+      return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
+    }
+
+    /**
+     * Remembers to invoke {@link #canAssign()} first.
+     */
+    public void assign() {
+      Preconditions.checkState(canAssign(),
+          "Can not assign insert to small file: assigned => "
+              + this.assigned + " totalUnassigned => " + this.totalUnassigned);
+      this.assigned++;
+    }
+  }
+
+  /**
+   * Candidate bucket state for a new file. It records the total number of 
records
+   * that the bucket can append and the current number of assigned records.
+   */
+  private static class NewFileAssignState {
+    long assigned;
+    long totalUnassigned;
+    final String fileId;
+
+    NewFileAssignState(String fileId, long insertRecordsPerBucket) {
+      this.fileId = fileId;
+      this.assigned = 0;
+      this.totalUnassigned = insertRecordsPerBucket;
+    }
+
+    public boolean canAssign() {
+      return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
+    }
+
+    /**
+     * Remembers to invoke {@link #canAssign()} first.
+     */
+    public void assign() {
+      Preconditions.checkState(canAssign(),
+          "Can not assign insert to new file: assigned => "
+              + this.assigned + " totalUnassigned => " + this.totalUnassigned);
+      this.assigned++;
+    }
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
new file mode 100644
index 0000000..2d47c79
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.transform;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+
+/**
+ * Function that transforms RowData to HoodieRecord.
+ */
+public class RowDataToHoodieFunction<I extends RowData, O extends 
HoodieRecord<?>>
+    extends RichMapFunction<I, O> {
+  /**
+   * Row type of the input.
+   */
+  private final RowType rowType;
+
+  /**
+   * Avro schema of the input.
+   */
+  private transient Schema avroSchema;
+
+  /**
+   * RowData to Avro record converter.
+   */
+  private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
+
+  /**
+   * HoodieKey generator.
+   */
+  private transient KeyGenerator keyGenerator;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  public RowDataToHoodieFunction(RowType rowType, Configuration config) {
+    this.rowType = rowType;
+    this.config = config;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.avroSchema = StreamerUtil.getSourceSchema(this.config);
+    this.converter = RowDataToAvroConverters.createConverter(this.rowType);
+    this.keyGenerator = 
StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public O map(I i) throws Exception {
+    return (O) toHoodieRecord(i);
+  }
+
+  /**
+   * Converts the give record to a {@link HoodieRecord}.
+   *
+   * @param record The input record
+   * @return HoodieRecord based on the configuration
+   * @throws IOException if error occurs
+   */
+  @SuppressWarnings("rawtypes")
+  private HoodieRecord toHoodieRecord(I record) throws IOException {
+    boolean shouldCombine = 
this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+        || 
WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == 
WriteOperationType.UPSERT;
+    GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, 
record);
+    final String payloadClazz = 
this.config.getString(FlinkOptions.PAYLOAD_CLASS);
+    Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
+        this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
+    HoodieRecordPayload payload = shouldCombine
+        ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
+        : StreamerUtil.createPayload(payloadClazz, gr);
+    return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 7df63fa..418e2ea 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -71,8 +71,7 @@ public class FlinkStreamerConfig extends Configuration {
       + "hoodie client, schema provider, key generator and data source. For 
hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics 
endpoints, hive configs etc. For sources, refer"
       + "to individual classes, for supported properties.")
-  public String propsFilePath =
-      "file://" + System.getProperty("user.dir") + 
"/src/test/resources/delta-streamer-config/dfs-source.properties";
+  public String propsFilePath = "";
 
   @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that 
can be set in the properties file "
       + "(using the CLI parameter \"--props\") can also be passed command line 
using this parameter.")
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index f6d75d3..d110bff 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -22,9 +22,11 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.operator.InstantGenerateOperator;
 import org.apache.hudi.operator.KeyedWriteProcessFunction;
 import org.apache.hudi.operator.KeyedWriteProcessOperator;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.CommitSink;
 import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
 import org.apache.hudi.util.StreamerUtil;
@@ -34,9 +36,11 @@ import 
org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 
 import java.util.List;
@@ -66,12 +70,16 @@ public class HoodieFlinkStreamer {
       env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
     }
 
+    Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
+    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
+
     TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
 
     // add data source config
     props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
     props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, 
cfg.sourceOrderingField);
 
+    StreamerUtil.initTableIfNotExists(conf);
     // Read from kafka source
     DataStream<HoodieRecord> inputRecords =
         env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new 
SimpleStringSchema(), props))
@@ -86,13 +94,20 @@ public class HoodieFlinkStreamer {
 
         // Keyby partition path, to avoid multiple subtasks writing to a 
partition at the same time
         .keyBy(HoodieRecord::getPartitionPath)
-
+        // use the bucket assigner to generate bucket IDs
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
         // write operator, where the write operation really happens
         .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new 
TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
         }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
         .name("write_process")
         .uid("write_process_uid")
-        .setParallelism(env.getParallelism())
+        .setParallelism(numWriteTask)
 
         // Commit can only be executed once, so make it one parallelism
         .addSink(new CommitSink())
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
index a8f9245..24b8994 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
@@ -18,22 +18,25 @@
 
 package org.apache.hudi.streamer;
 
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.operator.StreamWriteOperatorFactory;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
+import org.apache.hudi.operator.transform.RowDataToHoodieFunction;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import com.beust.jcommander.JCommander;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
 import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Properties;
@@ -70,13 +73,8 @@ public class HoodieFlinkStreamerV2 {
             .getLogicalType();
     Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
     int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
-    StreamWriteOperatorFactory<RowData> operatorFactory =
-        new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask);
-
-    int partitionFieldIndex = 
rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
-    LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex);
-    final RowData.FieldGetter partitionFieldGetter =
-        RowData.createFieldGetter(partitionFieldType, partitionFieldIndex);
+    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
+        new StreamWriteOperatorFactory<>(conf, numWriteTask);
 
     DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
         cfg.kafkaTopic,
@@ -89,11 +87,19 @@ public class HoodieFlinkStreamerV2 {
         ), kafkaProps))
         .name("kafka_source")
         .uid("uid_kafka_source")
+        .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
         // Key-by partition path, to avoid multiple subtasks write to a 
partition at the same time
-        .keyBy(partitionFieldGetter::getFieldOrNull)
+        .keyBy(HoodieRecord::getPartitionPath)
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
         .transform("hoodie_stream_write", null, operatorFactory)
         .uid("uid_hoodie_stream_write")
-        .setParallelism(numWriteTask); // should make it configurable
+        .setParallelism(numWriteTask);
 
     env.addOperator(dataStream.getTransformation());
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 2fa8757..4447705 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -57,6 +59,7 @@ import java.util.Properties;
  * Utilities for Flink stream read and write.
  */
 public class StreamerUtil {
+  private static final String DEFAULT_ARCHIVE_LOG_FOLDER = "archived";
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerUtil.class);
 
@@ -68,6 +71,9 @@ public class StreamerUtil {
   }
 
   public static TypedProperties getProps(FlinkStreamerConfig cfg) {
+    if (cfg.propsFilePath.isEmpty()) {
+      return new TypedProperties();
+    }
     return readConfig(
         FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
         new Path(cfg.propsFilePath), cfg.configs).getConfig();
@@ -208,6 +214,10 @@ public class StreamerUtil {
     }
   }
 
+  public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig 
conf) {
+    return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
+  }
+
   public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder()
@@ -250,4 +260,37 @@ public class StreamerUtil {
     checkPropNames.forEach(prop ->
         Preconditions.checkState(props.containsKey(prop), "Required property " 
+ prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTableIfNotExists(Configuration conf) throws 
IOException {
+    final String basePath = conf.getString(FlinkOptions.PATH);
+    final org.apache.hadoop.conf.Configuration hadoopConf = 
StreamerUtil.getHadoopConf();
+    // Hadoop FileSystem
+    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+      if (!fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))) {
+        HoodieTableMetaClient.initTableType(
+            hadoopConf,
+            basePath,
+            HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
+            conf.getString(FlinkOptions.TABLE_NAME),
+            DEFAULT_ARCHIVE_LOG_FOLDER,
+            conf.getString(FlinkOptions.PAYLOAD_CLASS),
+            1);
+        LOG.info("Table initialized under base path {}", basePath);
+      } else {
+        LOG.info("Table [{}/{}] already exists, no need to initialize the 
table",
+            basePath, conf.getString(FlinkOptions.TABLE_NAME));
+      }
+    }
+  }
+
+  /** Generates the bucket ID using format {partition path}_{fileID}. */
+  public static String generateBucketKey(String partitionPath, String fileId) {
+    return String.format("%s_%s", partitionPath, fileId);
+  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
 
b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
index c2d7a65..fea9b8f 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.operator.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.operator.utils.TestConfigurations;
 import org.apache.hudi.operator.utils.TestData;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.table.data.RowData;
 import org.hamcrest.MatcherAssert;
@@ -56,24 +57,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class StreamWriteFunctionTest {
 
-  private static final Map<String, String> EXPECTED = new HashMap<>();
-
-  static {
-    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1,par1, 
id2,par1,id2,Stephen,33,2,par1]");
-    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3,par2, 
id4,par2,id4,Fabian,31,4,par2]");
-    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5,par3, 
id6,par3,id6,Emma,20,6,par3]");
-    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7,par4, 
id8,par4,id8,Han,56,8,par4]");
-  }
+  private static final Map<String, String> EXPECTED1 = new HashMap<>();
 
   private static final Map<String, String> EXPECTED2 = new HashMap<>();
 
+  private static final Map<String, String> EXPECTED3 = new HashMap<>();
+
   static {
+    EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, 
id2,par1,id2,Stephen,33,2,par1]");
+    EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, 
id4,par2,id4,Fabian,31,4,par2]");
+    EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, 
id6,par3,id6,Emma,20,6,par3]");
+    EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, 
id8,par4,id8,Han,56,8,par4]");
+
     EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, 
id2,par1,id2,Stephen,34,2,par1]");
     EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, 
id4,par2,id4,Fabian,32,4,par2]");
     EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, 
id6,par3,id6,Emma,20,6,par3, "
         + "id9,par3,id9,Jane,19,6,par3]");
     EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, 
id11,par4,id11,Phoebe,52,8,par4, "
         + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+
+    EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
   }
 
   private StreamWriteFunctionWrapper<RowData> funcWrapper;
@@ -83,9 +86,7 @@ public class StreamWriteFunctionTest {
 
   @BeforeEach
   public void before() throws Exception {
-    this.funcWrapper = new StreamWriteFunctionWrapper<>(
-        tempFile.getAbsolutePath(),
-        TestConfigurations.SERIALIZER);
+    this.funcWrapper = new 
StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath());
   }
 
   @AfterEach
@@ -211,7 +212,7 @@ public class StreamWriteFunctionTest {
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
-    checkWrittenData(tempFile, EXPECTED);
+    checkWrittenData(tempFile, EXPECTED1);
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
@@ -220,7 +221,43 @@ public class StreamWriteFunctionTest {
     funcWrapper.checkpointComplete(1);
     // the coordinator checkpoint commits the inflight instant.
     checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, instant);
-    checkWrittenData(tempFile, EXPECTED);
+    checkWrittenData(tempFile, EXPECTED1);
+  }
+
+  @Test
+  public void testInsertDuplicates() throws Exception {
+    // reset the config option
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_THREE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertEmptyDataFiles();
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
+    checkWrittenData(tempFile, EXPECTED3, 1);
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
+
+    funcWrapper.checkpointComplete(1);
+
+    // insert duplicates again
+    for (RowData rowData : TestData.DATA_SET_THREE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    funcWrapper.checkpointFunction(2);
+
+    checkWrittenData(tempFile, EXPECTED3, 1);
   }
 
   @Test
@@ -248,7 +285,7 @@ public class StreamWriteFunctionTest {
       funcWrapper.invoke(rowData);
     }
     // the data is not flushed yet
-    checkWrittenData(tempFile, EXPECTED);
+    checkWrittenData(tempFile, EXPECTED1);
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(2);
 
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
index 56f946b..f745a3c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -18,16 +18,24 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
+import org.apache.hudi.operator.transform.RowDataToHoodieFunction;
 import org.apache.hudi.operator.utils.TestConfigurations;
 import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.sink.CommitSink;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
@@ -37,7 +45,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.table.data.RowData;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.TestLogger;
@@ -47,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
@@ -74,19 +83,15 @@ public class StreamWriteITCase extends TestLogger {
     StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
     execEnv.getConfig().disableObjectReuse();
     execEnv.setParallelism(4);
-    // 1 second a time
-    execEnv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
 
-    // Read from kafka source
+    // Read from file source
     RowType rowType =
         (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
-    StreamWriteOperatorFactory<RowData> operatorFactory =
-        new StreamWriteOperatorFactory<>(rowType, conf, 4);
-
-    int partitionFieldIndex = 
rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
-    final RowData.FieldGetter partitionFieldGetter =
-        RowData.createFieldGetter(rowType.getTypeAt(partitionFieldIndex), 
partitionFieldIndex);
+    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
+        new StreamWriteOperatorFactory<>(conf, 4);
 
     JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
         rowType,
@@ -107,17 +112,103 @@ public class StreamWriteITCase extends TestLogger {
         // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
         .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 
1000, typeInfo)
         .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4)
+        .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
         // Key-by partition path, to avoid multiple subtasks write to a 
partition at the same time
-        .keyBy(partitionFieldGetter::getFieldOrNull)
+        .keyBy(HoodieRecord::getPartitionPath)
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
         .transform("hoodie_stream_write", null, operatorFactory)
-        .uid("uid_hoodie_stream_write")
-        .setParallelism(4);
+        .uid("uid_hoodie_stream_write");
     execEnv.addOperator(dataStream.getTransformation());
 
     JobClient client = 
execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
     if (client.getJobStatus().get() != JobStatus.FAILED) {
       try {
-        TimeUnit.SECONDS.sleep(10);
+        TimeUnit.SECONDS.sleep(8);
+        client.cancel();
+      } catch (Throwable var1) {
+        // ignored
+      }
+    }
+
+    TestData.checkWrittenData(tempFile, EXPECTED);
+  }
+
+  @Test
+  public void testWriteToHoodieLegacy() throws Exception {
+    FlinkStreamerConfig streamerConf = 
TestConfigurations.getDefaultStreamerConf(tempFile.getAbsolutePath());
+    Configuration conf = FlinkOptions.fromStreamerConfig(streamerConf);
+    StreamerUtil.initTableIfNotExists(conf);
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(4);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getConfig().setGlobalJobParameters(streamerConf);
+
+    // Read from file source
+    RowType rowType =
+        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+
+    JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+        rowType,
+        new RowDataTypeInfo(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
+
+    TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+    format.setFilesFilter(FilePathFilter.createDefaultFilter());
+    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+    format.setCharsetName("UTF-8");
+
+    execEnv
+        // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+        .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 
1000, typeInfo)
+        .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4)
+        .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
+        .transform(InstantGenerateOperator.NAME, 
TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
+        .name("instant_generator")
+        .uid("instant_generator_id")
+
+        // Keyby partition path, to avoid multiple subtasks writing to a 
partition at the same time
+        .keyBy(HoodieRecord::getPartitionPath)
+        // use the bucket assigner to generate bucket IDs
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
+        // write operator, where the write operation really happens
+        .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new 
TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
+        }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
+        .name("write_process")
+        .uid("write_process_uid")
+        .setParallelism(4)
+
+        // Commit can only be executed once, so make it one parallelism
+        .addSink(new CommitSink())
+        .name("commit_sink")
+        .uid("commit_sink_uid")
+        .setParallelism(1);
+
+    JobClient client = 
execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
+    if (client.getJobStatus().get() != JobStatus.FAILED) {
+      try {
+        TimeUnit.SECONDS.sleep(8);
         client.cancel();
       } catch (Throwable var1) {
         // ignored
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java
 
b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java
new file mode 100644
index 0000000..e27ea07
--- /dev/null
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link BucketAssigner}.
+ */
+public class TestBucketAssigner {
+  private HoodieWriteConfig writeConfig;
+  private HoodieFlinkEngineContext context;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws IOException {
+    final String basePath = tempFile.getAbsolutePath();
+    final Configuration conf = TestConfigurations.getDefaultConf(basePath);
+
+    writeConfig = StreamerUtil.getHoodieClientConfig(conf);
+    context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+        new FlinkTaskContextSupplier(null));
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  public void testAddUpdate() {
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, 
writeConfig);
+    BucketInfo bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0");
+
+    mockBucketAssigner.addUpdate("par1", "file_id_0");
+    bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0");
+
+    mockBucketAssigner.addUpdate("par1", "file_id_1");
+    bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_1");
+
+    bucketInfo = mockBucketAssigner.addUpdate("par2", "file_id_0");
+    assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "file_id_0");
+
+    bucketInfo = mockBucketAssigner.addUpdate("par3", "file_id_2");
+    assertBucketEquals(bucketInfo, "par3", BucketType.UPDATE, "file_id_2");
+  }
+
+  @Test
+  public void testAddInsert() {
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, 
writeConfig);
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.INSERT);
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.INSERT);
+
+    mockBucketAssigner.addInsert("par2");
+    bucketInfo = mockBucketAssigner.addInsert("par2");
+    assertBucketEquals(bucketInfo, "par2", BucketType.INSERT);
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+  }
+
+  @Test
+  public void testInsertWithSmallFiles() {
+    SmallFile f0 = new SmallFile();
+    f0.location = new HoodieRecordLocation("t0", "f0");
+    f0.sizeBytes = 12;
+
+    SmallFile f1 = new SmallFile();
+    f1.location = new HoodieRecordLocation("t0", "f1");
+    f1.sizeBytes = 122879; // no left space to append new records to this 
bucket
+
+    SmallFile f2 = new SmallFile();
+    f2.location = new HoodieRecordLocation("t0", "f2");
+    f2.sizeBytes = 56;
+
+    Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+    smallFilesMap.put("par1", Arrays.asList(f0, f1));
+    smallFilesMap.put("par2", Collections.singletonList(f2));
+
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, 
writeConfig, smallFilesMap);
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par2");
+    bucketInfo = mockBucketAssigner.addInsert("par2");
+    assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2");
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+  }
+
+  @Test
+  public void testUpdateAndInsertWithSmallFiles() {
+    SmallFile f0 = new SmallFile();
+    f0.location = new HoodieRecordLocation("t0", "f0");
+    f0.sizeBytes = 12;
+
+    SmallFile f1 = new SmallFile();
+    f1.location = new HoodieRecordLocation("t0", "f1");
+    f1.sizeBytes = 122879; // no left space to append new records to this 
bucket
+
+    SmallFile f2 = new SmallFile();
+    f2.location = new HoodieRecordLocation("t0", "f2");
+    f2.sizeBytes = 56;
+
+    Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+    smallFilesMap.put("par1", Arrays.asList(f0, f1));
+    smallFilesMap.put("par2", Collections.singletonList(f2));
+
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, 
writeConfig, smallFilesMap);
+    mockBucketAssigner.addUpdate("par1", "f0");
+
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addUpdate("par1", "f2");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addUpdate("par2", "f0");
+
+    mockBucketAssigner.addInsert("par2");
+    bucketInfo = mockBucketAssigner.addInsert("par2");
+    assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2");
+  }
+
+  private void assertBucketEquals(
+      BucketInfo bucketInfo,
+      String partition,
+      BucketType bucketType,
+      String fileId) {
+    BucketInfo actual = new BucketInfo(bucketType, fileId, partition);
+    assertThat(bucketInfo, is(actual));
+  }
+
+  private void assertBucketEquals(
+      BucketInfo bucketInfo,
+      String partition,
+      BucketType bucketType) {
+    assertThat(bucketInfo.getPartitionPath(), is(partition));
+    assertThat(bucketInfo.getBucketType(), is(bucketType));
+  }
+
+  /**
+   * Mock BucketAssigner that can specify small files explicitly.
+   */
+  static class MockBucketAssigner extends BucketAssigner {
+    private final Map<String, List<SmallFile>> smallFilesMap;
+
+    MockBucketAssigner(
+        HoodieFlinkEngineContext context,
+        HoodieWriteConfig config) {
+      this(context, config, Collections.emptyMap());
+    }
+
+    MockBucketAssigner(
+        HoodieFlinkEngineContext context,
+        HoodieWriteConfig config,
+        Map<String, List<SmallFile>> smallFilesMap) {
+      super(context, config);
+      this.smallFilesMap = smallFilesMap;
+    }
+
+    @Override
+    protected List<SmallFile> getSmallFiles(String partitionPath) {
+      if (this.smallFilesMap.containsKey(partitionPath)) {
+        return this.smallFilesMap.get(partitionPath);
+      }
+      return Collections.emptyList();
+    }
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
index 1b02791..59de283 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
@@ -18,7 +18,14 @@
 
 package org.apache.hudi.operator.utils;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.operator.StreamWriteFunction;
+import org.apache.hudi.operator.StreamWriteOperatorCoordinator;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
+import org.apache.hudi.operator.transform.RowDataToHoodieFunction;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -27,13 +34,9 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
-
-import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.operator.StreamWriteFunction;
-import org.apache.hudi.operator.StreamWriteOperatorCoordinator;
-import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -43,7 +46,6 @@ import java.util.concurrent.CompletableFuture;
  * @param <I> Input type
  */
 public class StreamWriteFunctionWrapper<I> {
-  private final TypeSerializer<I> serializer;
   private final Configuration conf;
 
   private final IOManager ioManager;
@@ -52,10 +54,18 @@ public class StreamWriteFunctionWrapper<I> {
   private final StreamWriteOperatorCoordinator coordinator;
   private final MockFunctionInitializationContext 
functionInitializationContext;
 
-  private StreamWriteFunction<Object, I, Object> function;
+  /** Function that converts row data to HoodieRecord. */
+  private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+  /** Function that assigns bucket ID. */
+  private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> 
bucketAssignerFunction;
+  /** Stream write function. */
+  private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
 
-  public StreamWriteFunctionWrapper(String tablePath, TypeSerializer<I> 
serializer) throws Exception {
-    this.serializer = serializer;
+  public StreamWriteFunctionWrapper(String tablePath) throws Exception {
+    this(tablePath, TestConfigurations.getDefaultConf(tablePath));
+  }
+
+  public StreamWriteFunctionWrapper(String tablePath, Configuration conf) 
throws Exception {
     this.ioManager = new IOManagerAsync();
     MockEnvironment environment = new MockEnvironmentBuilder()
         .setTaskName("mockTask")
@@ -64,7 +74,7 @@ public class StreamWriteFunctionWrapper<I> {
         .build();
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
     this.gateway = new MockOperatorEventGateway();
-    this.conf = TestConfigurations.getDefaultConf(tablePath);
+    this.conf = conf;
     // one function
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
     this.coordinator.start();
@@ -72,14 +82,37 @@ public class StreamWriteFunctionWrapper<I> {
   }
 
   public void openFunction() throws Exception {
-    function = new StreamWriteFunction<>(TestConfigurations.ROW_TYPE, 
this.conf);
-    function.setRuntimeContext(runtimeContext);
-    function.setOperatorEventGateway(gateway);
-    function.open(this.conf);
+    toHoodieFunction = new 
RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
+    toHoodieFunction.setRuntimeContext(runtimeContext);
+    toHoodieFunction.open(conf);
+
+    bucketAssignerFunction = new BucketAssignFunction<>(conf);
+    bucketAssignerFunction.setRuntimeContext(runtimeContext);
+    bucketAssignerFunction.open(conf);
+    bucketAssignerFunction.initializeState(this.functionInitializationContext);
+
+    writeFunction = new StreamWriteFunction<>(conf);
+    writeFunction.setRuntimeContext(runtimeContext);
+    writeFunction.setOperatorEventGateway(gateway);
+    writeFunction.open(conf);
   }
 
   public void invoke(I record) throws Exception {
-    function.processElement(record, null, null);
+    HoodieRecord<?> hoodieRecord = toHoodieFunction.map((RowData) record);
+    HoodieRecord<?>[] hoodieRecords = new HoodieRecord[1];
+    Collector<HoodieRecord<?>> collector = new Collector<HoodieRecord<?>>() {
+      @Override
+      public void collect(HoodieRecord<?> record) {
+        hoodieRecords[0] = record;
+      }
+
+      @Override
+      public void close() {
+
+      }
+    };
+    bucketAssignerFunction.processElement(hoodieRecord, null, collector);
+    writeFunction.processElement(hoodieRecords[0], null, null);
   }
 
   public BatchWriteSuccessEvent[] getEventBuffer() {
@@ -92,19 +125,22 @@ public class StreamWriteFunctionWrapper<I> {
 
   @SuppressWarnings("rawtypes")
   public HoodieFlinkWriteClient getWriteClient() {
-    return this.function.getWriteClient();
+    return this.writeFunction.getWriteClient();
   }
 
   public void checkpointFunction(long checkpointId) throws Exception {
     // checkpoint the coordinator first
     this.coordinator.checkpointCoordinator(checkpointId, new 
CompletableFuture<>());
-    function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+    bucketAssignerFunction.snapshotState(null);
+
+    writeFunction.snapshotState(null);
     
functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
   }
 
   public void checkpointComplete(long checkpointId) {
     
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
     coordinator.checkpointComplete(checkpointId);
+    this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
   }
 
   public void checkpointFails(long checkpointId) {
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
index 7513fed..d9e603a 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.operator.utils;
 
 import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
@@ -56,4 +57,16 @@ public class TestConfigurations {
     conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
     return conf;
   }
+
+  public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
+    FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
+    streamerConf.targetBasePath = tablePath;
+    streamerConf.readSchemaFilePath = 
Objects.requireNonNull(Thread.currentThread()
+        
.getContextClassLoader().getResource("test_read_schema.avsc")).toString();
+    streamerConf.targetTableName = "TestHoodieTable";
+    streamerConf.partitionPathField = "partition";
+    streamerConf.tableType = "COPY_ON_WRITE";
+    streamerConf.checkpointInterval = 4000L;
+    return streamerConf;
+  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index 7c2c314..b4c24ef 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -43,6 +43,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.IntStream;
 
 import static junit.framework.TestCase.assertEquals;
 import static org.hamcrest.CoreMatchers.is;
@@ -92,6 +93,13 @@ public class TestData {
           TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
   );
 
+  public static List<RowData> DATA_SET_THREE = new ArrayList<>();
+  static {
+    IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add(
+        binaryRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
+  }
+
   /**
    * Checks the source data TestConfigurations.DATA_SET_ONE are written as 
expected.
    *
@@ -101,13 +109,29 @@ public class TestData {
    * @param expected The expected results mapping, the key should be the 
partition path
    */
   public static void checkWrittenData(File baseFile, Map<String, String> 
expected) throws IOException {
+    checkWrittenData(baseFile, expected, 4);
+  }
+
+  /**
+   * Checks the source data TestConfigurations.DATA_SET_ONE are written as 
expected.
+   *
+   * <p>Note: Replace it with the Flink reader when it is supported.
+   *
+   * @param baseFile   The file base to check, should be a directly
+   * @param expected   The expected results mapping, the key should be the 
partition path
+   * @param partitions The expected partition number
+   */
+  public static void checkWrittenData(
+      File baseFile,
+      Map<String, String> expected,
+      int partitions) throws IOException {
     assert baseFile.isDirectory();
     FileFilter filter = file -> !file.getName().startsWith(".");
     File[] partitionDirs = baseFile.listFiles(filter);
     assertNotNull(partitionDirs);
-    assertThat(partitionDirs.length, is(4));
+    assertThat(partitionDirs.length, is(partitions));
     for (File partitionDir : partitionDirs) {
-      File[] dataFiles = partitionDir.listFiles(file -> 
file.getName().endsWith(".parquet"));
+      File[] dataFiles = partitionDir.listFiles(filter);
       assertNotNull(dataFiles);
       File latestDataFile = Arrays.stream(dataFiles)
           .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))

Reply via email to