This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e4e2fbc  [HUDI-1419] Add base implementation for hudi java client 
(#2286)
e4e2fbc is described below

commit e4e2fbc3bb2c4796c6813114dd1c37ffa5a1e03a
Author: Shen Hong <[email protected]>
AuthorDate: Sun Dec 20 11:25:27 2020 +0800

    [HUDI-1419] Add base implementation for hudi java client (#2286)
---
 .../hudi/table/action/commit/Partitioner.java      |   0
 hudi-client/hudi-java-client/pom.xml               |   6 +
 .../apache/hudi/client/HoodieJavaWriteClient.java  | 235 +++++++++++++++
 .../client/common/HoodieJavaEngineContext.java     |   4 +
 .../client/common/JavaTaskContextSupplier.java}    |  28 +-
 .../hudi/execution/JavaLazyInsertIterable.java     |  80 +++++
 .../org/apache/hudi/index/JavaHoodieIndex.java     |  71 +++++
 .../apache/hudi/index/JavaInMemoryHashIndex.java   | 120 ++++++++
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     | 178 +++++++++++
 .../hudi/table/HoodieJavaMergeOnReadTable.java}    |  16 +-
 .../org/apache/hudi/table/HoodieJavaTable.java     |  72 +++++
 .../action/clean/JavaCleanActionExecutor.java      | 130 ++++++++
 .../commit/BaseJavaCommitActionExecutor.java       | 329 +++++++++++++++++++++
 .../commit/JavaInsertCommitActionExecutor.java     |  50 ++++
 .../JavaInsertPreppedCommitActionExecutor.java     |  49 +++
 .../hudi/table/action/commit/JavaMergeHelper.java  | 115 +++++++
 .../commit/JavaUpsertCommitActionExecutor.java     |  50 ++++
 .../JavaUpsertPreppedCommitActionExecutor.java     |  49 +++
 .../hudi/table/action/commit/JavaWriteHelper.java  |  68 +++++
 .../table/action/commit/UpsertPartitioner.java     | 319 ++++++++++++++++++++
 hudi-examples/pom.xml                              |   6 +
 .../common/HoodieExampleDataGenerator.java         |   2 +-
 .../java/HoodieJavaWriteClientExample.java         | 109 +++++++
 23 files changed, 2074 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
similarity index 100%
copy from 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
copy to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
diff --git a/hudi-client/hudi-java-client/pom.xml 
b/hudi-client/hudi-java-client/pom.xml
index 6429ade..30835e2 100644
--- a/hudi-client/hudi-java-client/pom.xml
+++ b/hudi-client/hudi-java-client/pom.xml
@@ -55,6 +55,12 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-hadoop-mr</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- Test -->
         <dependency>
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
new file mode 100644
index 0000000..67a6071
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.JavaHoodieIndex;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
+    AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
+
+  public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig 
clientConfig) {
+    super(context, clientConfig);
+  }
+
+  public HoodieJavaWriteClient(HoodieEngineContext context,
+                               HoodieWriteConfig writeConfig,
+                               boolean rollbackPending,
+                               Option<EmbeddedTimelineService> 
timelineService) {
+    super(context, writeConfig, rollbackPending, timelineService);
+  }
+
+  @Override
+  public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> 
hoodieRecords) {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieJavaTable<T> table = HoodieJavaTable.create(config, 
(HoodieJavaEngineContext) context);
+    Timer.Context indexTimer = metrics.getIndexCtx();
+    List<HoodieRecord<T>> recordsWithLocation = 
getIndex().tagLocation(hoodieRecords, context, table);
+    metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer 
== null ? 0L : indexTimer.stop()));
+    return recordsWithLocation.stream().filter(v1 -> 
!v1.isCurrentLocationKnown()).collect(Collectors.toList());
+  }
+
+  @Override
+  protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> createIndex(HoodieWriteConfig writeConfig) {
+    return JavaHoodieIndex.createIndex(config);
+  }
+
+  @Override
+  public boolean commit(String instantTime,
+                        List<WriteStatus> writeStatuses,
+                        Option<Map<String, String>> extraMetadata,
+                        String commitActionType,
+                        Map<String, List<String>> partitionToReplacedFileIds) {
+    List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds);
+  }
+
+  @Override
+  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> createTable(HoodieWriteConfig config,
+                                                                               
                   Configuration hadoopConf) {
+    return HoodieJavaTable.create(config, context);
+  }
+
+  @Override
+  public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
+                                  String instantTime) {
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
+        getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
+    table.validateUpsertSchema();
+    setOperationType(WriteOperationType.UPSERT);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, 
instantTime, records);
+    if (result.getIndexLookupDuration().isPresent()) {
+      metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
+    }
+    return postWrite(result, instantTime, table);
+  }
+
+  @Override
+  public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> 
preppedRecords,
+                                                String instantTime) {
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
+        getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
+    table.validateUpsertSchema();
+    setOperationType(WriteOperationType.UPSERT_PREPPED);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    HoodieWriteMetadata<List<WriteStatus>> result = 
table.upsertPrepped(context,instantTime, preppedRecords);
+    return postWrite(result, instantTime, table);
+  }
+
+  @Override
+  public List<WriteStatus> insert(List<HoodieRecord<T>> records, String 
instantTime) {
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
+        getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
+    table.validateUpsertSchema();
+    setOperationType(WriteOperationType.INSERT);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, 
instantTime, records);
+    if (result.getIndexLookupDuration().isPresent()) {
+      metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
+    }
+    return postWrite(result, instantTime, table);
+  }
+
+  @Override
+  public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> 
preppedRecords,
+                                                String instantTime) {
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
+        getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
+    table.validateInsertSchema();
+    setOperationType(WriteOperationType.INSERT_PREPPED);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    HoodieWriteMetadata<List<WriteStatus>> result = 
table.insertPrepped(context,instantTime, preppedRecords);
+    return postWrite(result, instantTime, table);
+  }
+
+  @Override
+  public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
+                                      String instantTime) {
+    throw new HoodieNotSupportedException("BulkInsert is not supported in 
HoodieJavaClient");
+  }
+
+  @Override
+  public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
+                                      String instantTime,
+                                      
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner) {
+    throw new HoodieNotSupportedException("BulkInsert is not supported in 
HoodieJavaClient");
+  }
+
+  @Override
+  public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> 
preppedRecords,
+                                                    String instantTime,
+                                                    
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+    throw new HoodieNotSupportedException("BulkInsertPreppedRecords is not 
supported in HoodieJavaClient");
+  }
+
+  @Override
+  public List<WriteStatus> delete(List<HoodieKey> keys,
+                                  String instantTime) {
+    throw new HoodieNotSupportedException("Delete is not supported in 
HoodieJavaClient");
+  }
+
+  @Override
+  protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> 
result,
+                                        String instantTime,
+                                        HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+    if (result.getIndexLookupDuration().isPresent()) {
+      metrics.updateIndexMetrics(getOperationType().name(), 
result.getIndexUpdateDuration().get().toMillis());
+    }
+    if (result.isCommitted()) {
+      // Perform post commit operations.
+      if (result.getFinalizeDuration().isPresent()) {
+        
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
+            result.getWriteStats().get().size());
+      }
+
+      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, 
Option.empty());
+
+      emitCommitMetrics(instantTime, result.getCommitMetadata().get(), 
hoodieTable.getMetaClient().getCommitActionType());
+    }
+    return result.getWriteStatuses();
+  }
+
+  @Override
+  public void commitCompaction(String compactionInstantTime,
+                               List<WriteStatus> writeStatuses,
+                               Option<Map<String, String>> extraMetadata) 
throws IOException {
+    throw new HoodieNotSupportedException("CommitCompaction is not supported 
in HoodieJavaClient");
+  }
+
+  @Override
+  protected void completeCompaction(HoodieCommitMetadata metadata,
+                                    List<WriteStatus> writeStatuses,
+                                    HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
+                                    String compactionCommitTime) {
+    throw new HoodieNotSupportedException("CompleteCompaction is not supported 
in HoodieJavaClient");
+  }
+
+  @Override
+  protected List<WriteStatus> compact(String compactionInstantTime,
+                                      boolean shouldComplete) {
+    throw new HoodieNotSupportedException("Compact is not supported in 
HoodieJavaClient");
+  }
+
+  @Override
+  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String 
instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
+    return getTableAndInitCtx(metaClient, operationType);
+  }
+
+  private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, 
WriteOperationType operationType) {
+    if (operationType == WriteOperationType.DELETE) {
+      setWriteSchemaForDeletes(metaClient);
+    }
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieJavaTable<T> table = HoodieJavaTable.create(config, 
(HoodieJavaEngineContext) context, metaClient);
+    if 
(table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION))
 {
+      writeTimer = metrics.getCommitCtx();
+    } else {
+      writeTimer = metrics.getDeltaCommitCtx();
+    }
+    return table;
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index a5cbdd1..7266310 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -43,6 +43,10 @@ import static 
org.apache.hudi.client.common.function.FunctionWrapper.throwingMap
  */
 public class HoodieJavaEngineContext extends HoodieEngineContext {
 
+  public HoodieJavaEngineContext(Configuration conf) {
+    this(conf, new JavaTaskContextSupplier());
+  }
+
   public HoodieJavaEngineContext(Configuration conf, TaskContextSupplier 
taskContextSupplier) {
     super(new SerializableConfiguration(conf), taskContextSupplier);
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
similarity index 59%
copy from 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
copy to 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
index 2d52c50..100d237 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
@@ -16,12 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action.commit;
+package org.apache.hudi.client.common;
 
-import java.io.Serializable;
+import org.apache.hudi.common.util.Option;
 
-public interface Partitioner extends Serializable {
-  int getNumPartitions();
+import java.util.function.Supplier;
 
-  int getPartition(Object key);
+public class JavaTaskContextSupplier extends TaskContextSupplier {
+  @Override
+  public Supplier<Integer> getPartitionIdSupplier() {
+    return () -> 0;
+  }
+
+  @Override
+  public Supplier<Integer> getStageIdSupplier() {
+    return () -> 0;
+  }
+
+  @Override
+  public Supplier<Long> getAttemptIdSupplier() {
+    return () -> 0L;
+  }
+
+  @Override
+  public Option<String> getProperty(EngineProperty prop) {
+    return Option.empty();
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
new file mode 100644
index 0000000..08c4831
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class JavaLazyInsertIterable<T extends HoodieRecordPayload> extends 
HoodieLazyInsertIterable<T> {
+  public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
+                                boolean areRecordsSorted,
+                                HoodieWriteConfig config,
+                                String instantTime,
+                                HoodieTable hoodieTable,
+                                String idPrefix,
+                                TaskContextSupplier taskContextSupplier) {
+    super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, 
idPrefix, taskContextSupplier);
+  }
+
+  public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
+                                boolean areRecordsSorted,
+                                HoodieWriteConfig config,
+                                String instantTime,
+                                HoodieTable hoodieTable,
+                                String idPrefix,
+                                TaskContextSupplier taskContextSupplier,
+                                WriteHandleFactory writeHandleFactory) {
+    super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, 
idPrefix, taskContextSupplier, writeHandleFactory);
+  }
+
+  @Override
+  protected List<WriteStatus> computeNext() {
+    // Executor service used for launching writer thread.
+    BoundedInMemoryExecutor<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> 
bufferedIteratorExecutor =
+        null;
+    try {
+      final Schema schema = new 
Schema.Parser().parse(hoodieConfig.getSchema());
+      bufferedIteratorExecutor =
+          new 
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new 
IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), 
getTransformFunction(schema));
+      final List<WriteStatus> result = bufferedIteratorExecutor.execute();
+      assert result != null && !result.isEmpty() && 
!bufferedIteratorExecutor.isRemaining();
+      return result;
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      if (null != bufferedIteratorExecutor) {
+        bufferedIteratorExecutor.shutdownNow();
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
new file mode 100644
index 0000000..3cec3fb
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+public abstract class JavaHoodieIndex<T extends HoodieRecordPayload> extends 
HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+  protected JavaHoodieIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  public static JavaHoodieIndex createIndex(HoodieWriteConfig config) {
+    // first use index class config to create index.
+    if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
+      Object instance = ReflectionUtils.loadClass(config.getIndexClass(), 
config);
+      if (!(instance instanceof HoodieIndex)) {
+        throw new HoodieIndexException(config.getIndexClass() + " is not a 
subclass of HoodieIndex");
+      }
+      return (JavaHoodieIndex) instance;
+    }
+
+    // TODO more indexes to be added
+    switch (config.getIndexType()) {
+      case INMEMORY:
+        return new JavaInMemoryHashIndex(config);
+      default:
+        throw new HoodieIndexException("Unsupported index type " + 
config.getIndexType());
+    }
+  }
+
+  @Override
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  public abstract List<WriteStatus> updateLocation(List<WriteStatus> 
writeStatuses,
+                                                   HoodieEngineContext context,
+                                                   HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws 
HoodieIndexException;
+
+  @Override
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  public abstract List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> 
records,
+                                                    HoodieEngineContext 
context,
+                                                    HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws 
HoodieIndexException;
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java
new file mode 100644
index 0000000..e95ee61
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * Hoodie Index implementation backed by an in-memory Hash map.
+ * <p>
+ * ONLY USE FOR LOCAL TESTING
+ */
+@SuppressWarnings("checkstyle:LineLength")
+public class JavaInMemoryHashIndex<T extends HoodieRecordPayload> extends 
JavaHoodieIndex<T> {
+
+  private static ConcurrentMap<HoodieKey, HoodieRecordLocation> 
recordLocationMap;
+
+  public JavaInMemoryHashIndex(HoodieWriteConfig config) {
+    super(config);
+    synchronized (JavaInMemoryHashIndex.class) {
+      if (recordLocationMap == null) {
+        recordLocationMap = new ConcurrentHashMap<>();
+      }
+    }
+  }
+
+  @Override
+  public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, 
HoodieEngineContext context,
+                                           HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+    List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
+    records.stream().forEach(record -> {
+      if (recordLocationMap.containsKey(record.getKey())) {
+        record.unseal();
+        record.setCurrentLocation(recordLocationMap.get(record.getKey()));
+        record.seal();
+      }
+      taggedRecords.add(record);
+    });
+    return taggedRecords;
+  }
+
+  @Override
+  public List<WriteStatus> updateLocation(List<WriteStatus> writeStatusList,
+                                          HoodieEngineContext context,
+                                          HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+    return writeStatusList.stream().map(writeStatus -> {
+      for (HoodieRecord record : writeStatus.getWrittenRecords()) {
+        if (!writeStatus.isErrored(record.getKey())) {
+          HoodieKey key = record.getKey();
+          Option<HoodieRecordLocation> newLocation = record.getNewLocation();
+          if (newLocation.isPresent()) {
+            recordLocationMap.put(key, newLocation.get());
+          } else {
+            // Delete existing index for a deleted record
+            recordLocationMap.remove(key);
+          }
+        }
+      }
+      return writeStatus;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Only looks up by recordKey.
+   */
+  @Override
+  public boolean isGlobal() {
+    return true;
+  }
+
+  /**
+   * Mapping is available in HBase already.
+   */
+  @Override
+  public boolean canIndexLogFiles() {
+    return true;
+  }
+
+  /**
+   * Index needs to be explicitly updated after storage write.
+   */
+  @Override
+  public boolean isImplicitWithStorage() {
+    return false;
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
new file mode 100644
index 0000000..7c45b75
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
+import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
+import 
org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
+import 
org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends 
HoodieJavaTable<T> {
+  protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
+                                       HoodieEngineContext context,
+                                       HoodieTableMetaClient metaClient) {
+    super(config, context, metaClient);
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext 
context,
+                                                       String instantTime,
+                                                       List<HoodieRecord<T>> 
records) {
+    return new JavaUpsertCommitActionExecutor<>(context, config,
+        this, instantTime, records).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext 
context,
+                                                       String instantTime,
+                                                       List<HoodieRecord<T>> 
records) {
+    return new JavaInsertCommitActionExecutor<>(context, config,
+        this, instantTime, records).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(HoodieEngineContext 
context,
+                                                           String instantTime,
+                                                           
List<HoodieRecord<T>> records,
+                                                           
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+    throw new HoodieNotSupportedException("BulkInsert is not supported yet");
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext 
context,
+                                                       String instantTime,
+                                                       List<HoodieKey> keys) {
+    throw new HoodieNotSupportedException("Delete is not supported yet");
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
upsertPrepped(HoodieEngineContext context,
+                                                              String 
instantTime,
+                                                              
List<HoodieRecord<T>> preppedRecords) {
+    return new 
JavaUpsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context, 
config,
+        this, instantTime, preppedRecords).execute();
+
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
insertPrepped(HoodieEngineContext context,
+                                                              String 
instantTime,
+                                                              
List<HoodieRecord<T>> preppedRecords) {
+    return new 
JavaInsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context, 
config,
+        this, instantTime, preppedRecords).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
bulkInsertPrepped(HoodieEngineContext context,
+                                                                  String 
instantTime,
+                                                                  
List<HoodieRecord<T>> preppedRecords,
+                                                                  
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+    throw new HoodieNotSupportedException("BulkInsertPrepped is not supported 
yet");
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
insertOverwrite(HoodieEngineContext context,
+                                                                String 
instantTime,
+                                                                
List<HoodieRecord<T>> records) {
+    throw new HoodieNotSupportedException("InsertOverwrite is not supported 
yet");
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
insertOverwriteTable(HoodieEngineContext context,
+                                                                     String 
instantTime,
+                                                                     
List<HoodieRecord<T>> records) {
+    throw new HoodieNotSupportedException("InsertOverwrite is not supported 
yet");
+  }
+
+  @Override
+  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context,
+                                                         String instantTime,
+                                                         Option<Map<String, 
String>> extraMetadata) {
+    throw new HoodieNotSupportedException("ScheduleCompaction is not supported 
yet");
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext 
context,
+                                                        String 
compactionInstantTime) {
+    throw new HoodieNotSupportedException("Compact is not supported yet");
+  }
+
+  @Override
+  public HoodieBootstrapWriteMetadata<List<WriteStatus>> 
bootstrap(HoodieEngineContext context,
+                                                                   
Option<Map<String, String>> extraMetadata) {
+    throw new HoodieNotSupportedException("Bootstrap is not supported yet");
+  }
+
+  @Override
+  public void rollbackBootstrap(HoodieEngineContext context,
+                                String instantTime) {
+    throw new HoodieNotSupportedException("RollbackBootstrap is not supported 
yet");
+  }
+
+  @Override
+  public HoodieCleanMetadata clean(HoodieEngineContext context,
+                                   String cleanInstantTime) {
+    return new JavaCleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
+  }
+
+  @Override
+  public HoodieRollbackMetadata rollback(HoodieEngineContext context,
+                                         String rollbackInstantTime,
+                                         HoodieInstant commitInstant,
+                                         boolean deleteInstants) {
+    throw new HoodieNotSupportedException("Rollback is not supported yet");
+  }
+
+  @Override
+  public HoodieSavepointMetadata savepoint(HoodieEngineContext context,
+                                           String instantToSavepoint,
+                                           String user,
+                                           String comment) {
+    throw new HoodieNotSupportedException("Savepoint is not supported yet");
+  }
+
+  @Override
+  public HoodieRestoreMetadata restore(HoodieEngineContext context,
+                                       String restoreInstantTime,
+                                       String instantToRestore) {
+    throw new HoodieNotSupportedException("Restore is not supported yet");
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
similarity index 58%
rename from 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
rename to 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 2d52c50..b446abe 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -16,12 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action.commit;
+package org.apache.hudi.table;
 
-import java.io.Serializable;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
 
-public interface Partitioner extends Serializable {
-  int getNumPartitions();
-
-  int getPartition(Object key);
+public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends 
HoodieJavaCopyOnWriteTable<T> {
+  protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, 
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
+    super(config, context, metaClient);
+  }
+  // TODO not support yet.
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
new file mode 100644
index 0000000..60a3065
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.JavaHoodieIndex;
+import org.apache.hudi.index.HoodieIndex;
+
+import java.util.List;
+
+public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
+    extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
+  protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext 
context, HoodieTableMetaClient metaClient) {
+    super(config, context, metaClient);
+  }
+
+  public static <T extends HoodieRecordPayload> HoodieJavaTable<T> 
create(HoodieWriteConfig config, HoodieEngineContext context) {
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
+        context.getHadoopConf().get(),
+        config.getBasePath(),
+        true,
+        config.getConsistencyGuardConfig(),
+        Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
+    );
+    return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, 
metaClient);
+  }
+
+  public static <T extends HoodieRecordPayload> HoodieJavaTable<T> 
create(HoodieWriteConfig config,
+                                                                          
HoodieJavaEngineContext context,
+                                                                          
HoodieTableMetaClient metaClient) {
+    switch (metaClient.getTableType()) {
+      case COPY_ON_WRITE:
+        return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient);
+      case MERGE_ON_READ:
+        throw new HoodieNotSupportedException("MERGE_ON_READ is not supported 
yet");
+      default:
+        throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
+    }
+  }
+
+  @Override
+  protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
+    return JavaHoodieIndex.createIndex(config);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
new file mode 100644
index 0000000..d1626c8
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.model.CleanFileInfo;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JavaCleanActionExecutor<T extends HoodieRecordPayload> extends
+    BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(JavaCleanActionExecutor.class);
+
+  public JavaCleanActionExecutor(HoodieEngineContext context,
+                                  HoodieWriteConfig config,
+                                  HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
+                                  String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan 
cleanerPlan) {
+
+    Iterator<ImmutablePair<String, CleanFileInfo>> 
filesToBeDeletedPerPartition = 
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
+        .flatMap(x -> x.getValue().stream().map(y -> new 
ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), 
y.getIsBootstrapBaseFile())))).iterator();
+
+    Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
+        deleteFilesFunc(filesToBeDeletedPerPartition, table)
+            .collect(Collectors.groupingBy(Pair::getLeft))
+            .entrySet().stream()
+            .map(x -> new ImmutablePair(x.getKey(), 
x.getValue().stream().map(y -> 
y.getRight()).reduce(PartitionCleanStat::merge).get()));
+
+    Map<String, PartitionCleanStat> partitionCleanStatsMap = 
partitionCleanStats
+        .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+
+    // Return PartitionCleanStat for each partition passed.
+    return 
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
 -> {
+      PartitionCleanStat partitionCleanStat = 
partitionCleanStatsMap.containsKey(partitionPath)
+          ? partitionCleanStatsMap.get(partitionPath)
+          : new PartitionCleanStat(partitionPath);
+      HoodieActionInstant actionInstant = 
cleanerPlan.getEarliestInstantToRetain();
+      return 
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
+          .withEarliestCommitRetained(Option.ofNullable(
+              actionInstant != null
+                  ? new 
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
+                  actionInstant.getAction(), actionInstant.getTimestamp())
+                  : null))
+          .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
+          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
+          .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
+          
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
+          
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
+          
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
+          .build();
+    }).collect(Collectors.toList());
+  }
+
+  private static Stream<Pair<String, PartitionCleanStat>> 
deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, 
HoodieTable table) {
+    Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
+    FileSystem fs = table.getMetaClient().getFs();
+
+    while (iter.hasNext()) {
+      Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
+      String partitionPath = partitionDelFileTuple.getLeft();
+      Path deletePath = new 
Path(partitionDelFileTuple.getRight().getFilePath());
+      String deletePathStr = deletePath.toString();
+      Boolean deletedFileResult = null;
+      try {
+        deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
+      } catch (IOException e) {
+        LOG.error("Delete file failed");
+      }
+      if (!partitionCleanStatMap.containsKey(partitionPath)) {
+        partitionCleanStatMap.put(partitionPath, new 
PartitionCleanStat(partitionPath));
+      }
+      boolean isBootstrapBasePathFile = 
partitionDelFileTuple.getRight().isBootstrapBaseFile();
+      PartitionCleanStat partitionCleanStat = 
partitionCleanStatMap.get(partitionPath);
+      if (isBootstrapBasePathFile) {
+        // For Bootstrap Base file deletions, store the full file path.
+        partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
+        partitionCleanStat.addDeletedFileResult(deletePath.toString(), 
deletedFileResult, true);
+      } else {
+        partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
+        partitionCleanStat.addDeletedFileResult(deletePath.getName(), 
deletedFileResult, false);
+      }
+    }
+    return partitionCleanStatMap.entrySet().stream().map(e -> 
Pair.of(e.getKey(), e.getValue()));
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
new file mode 100644
index 0000000..17b02e8
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.execution.JavaLazyInsertIterable;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class BaseJavaCommitActionExecutor<T extends 
HoodieRecordPayload> extends
+    BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>, HoodieWriteMetadata> {
+
+  private static final Logger LOG = 
LogManager.getLogger(BaseJavaCommitActionExecutor.class);
+
+  public BaseJavaCommitActionExecutor(HoodieEngineContext context,
+                                       HoodieWriteConfig config,
+                                       HoodieTable table,
+                                       String instantTime,
+                                       WriteOperationType operationType) {
+    super(context, config, table, instantTime, operationType, Option.empty());
+  }
+
+  public BaseJavaCommitActionExecutor(HoodieEngineContext context,
+                                       HoodieWriteConfig config,
+                                       HoodieTable table,
+                                       String instantTime,
+                                       WriteOperationType operationType,
+                                       Option extraMetadata) {
+    super(context, config, table, instantTime, operationType, extraMetadata);
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
+    HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
+
+    WorkloadProfile profile = null;
+    if (isWorkloadProfileNeeded()) {
+      profile = new WorkloadProfile(buildProfile(inputRecords));
+      LOG.info("Workload profile :" + profile);
+      try {
+        saveWorkloadProfileMetadataToInflight(profile, instantTime);
+      } catch (Exception e) {
+        HoodieTableMetaClient metaClient = table.getMetaClient();
+        HoodieInstant inflightInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), 
instantTime);
+        try {
+          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), 
inflightInstant.getFileName()))) {
+            throw new HoodieCommitException("Failed to commit " + instantTime 
+ " unable to save inflight metadata ", e);
+          }
+        } catch (IOException ex) {
+          LOG.error("Check file exists failed");
+          throw new HoodieCommitException("Failed to commit " + instantTime + 
" unable to save inflight metadata ", ex);
+        }
+      }
+    }
+
+    final Partitioner partitioner = getPartitioner(profile);
+    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = 
partition(inputRecords, partitioner);
+
+    List<WriteStatus> writeStatuses = new LinkedList<>();
+    partitionedRecords.forEach((partition, records) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        handleUpsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
+      } else {
+        handleInsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
+      }
+    });
+    updateIndex(writeStatuses, result);
+    return result;
+  }
+
+  protected void updateIndex(List<WriteStatus> writeStatuses, 
HoodieWriteMetadata<List<WriteStatus>> result) {
+    Instant indexStartTime = Instant.now();
+    // Update the index back
+    List<WriteStatus> statuses = 
table.getIndex().updateLocation(writeStatuses, context, table);
+    result.setIndexUpdateDuration(Duration.between(indexStartTime, 
Instant.now()));
+    result.setWriteStatuses(statuses);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return table.getMetaClient().getCommitActionType();
+  }
+
+  private Partitioner getPartitioner(WorkloadProfile profile) {
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      return getUpsertPartitioner(profile);
+    } else {
+      return getInsertPartitioner(profile);
+    }
+  }
+
+  private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>> 
dedupedRecords, Partitioner partitioner) {
+    Map<Integer, List<Pair<Pair<HoodieKey, Option<HoodieRecordLocation>>, 
HoodieRecord<T>>>> partitionedMidRecords = dedupedRecords
+        .stream()
+        .map(record -> Pair.of(Pair.of(record.getKey(), 
Option.ofNullable(record.getCurrentLocation())), record))
+        .collect(Collectors.groupingBy(x -> 
partitioner.getPartition(x.getLeft())));
+    Map<Integer, List<HoodieRecord<T>>> results = new LinkedHashMap<>();
+    partitionedMidRecords.forEach((key, value) -> results.put(key, 
value.stream().map(x -> x.getRight()).collect(Collectors.toList())));
+    return results;
+  }
+
+  protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> 
buildProfile(List<HoodieRecord<T>> inputRecords) {
+    HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
+    WorkloadStat globalStat = new WorkloadStat();
+
+    Map<Pair<String, Option<HoodieRecordLocation>>, Long> 
partitionLocationCounts = inputRecords
+        .stream()
+        .map(record -> Pair.of(
+            Pair.of(record.getPartitionPath(), 
Option.ofNullable(record.getCurrentLocation())), record))
+        .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
+
+    for (Map.Entry<Pair<String, Option<HoodieRecordLocation>>, Long> e : 
partitionLocationCounts.entrySet()) {
+      String partitionPath = e.getKey().getLeft();
+      Long count = e.getValue();
+      Option<HoodieRecordLocation> locOption = e.getKey().getRight();
+
+      if (!partitionPathStatMap.containsKey(partitionPath)) {
+        partitionPathStatMap.put(partitionPath, new WorkloadStat());
+      }
+
+      if (locOption.isPresent()) {
+        // update
+        partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), 
count);
+        globalStat.addUpdates(locOption.get(), count);
+      } else {
+        // insert
+        partitionPathStatMap.get(partitionPath).addInserts(count);
+        globalStat.addInserts(count);
+      }
+    }
+    return Pair.of(partitionPathStatMap, globalStat);
+  }
+
+  @Override
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<List<WriteStatus>> result) {
+    commit(extraMetadata, result, 
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+  }
+
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> 
writeStats) {
+    String actionType = getCommitActionType();
+    LOG.info("Committing " + instantTime + ", action Type " + actionType);
+    result.setCommitted(true);
+    result.setWriteStats(writeStats);
+    // Finalize write
+    finalizeWrite(instantTime, writeStats, result);
+
+    try {
+      LOG.info("Committing " + instantTime + ", action Type " + 
getCommitActionType());
+      HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+      HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, 
result.getPartitionToReplaceFileIds(),
+          extraMetadata, operationType, getSchemaToStoreInCommit(), 
getCommitActionType());
+
+      activeTimeline.saveAsComplete(new HoodieInstant(true, 
getCommitActionType(), instantTime),
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+      LOG.info("Committed " + instantTime);
+      result.setCommitMetadata(Option.of(metadata));
+    } catch (IOException e) {
+      throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime,
+          e);
+    }
+  }
+
+  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  protected boolean isWorkloadProfileNeeded() {
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
+                                                              Partitioner 
partitioner) {
+    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
+    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
+    BucketType btype = binfo.bucketType;
+    try {
+      if (btype.equals(BucketType.INSERT)) {
+        return handleInsert(binfo.fileIdPrefix, recordItr);
+      } else if (btype.equals(BucketType.UPDATE)) {
+        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, 
recordItr);
+      } else {
+        throw new HoodieUpsertException("Unknown bucketType " + btype + " for 
partition :" + partition);
+      }
+    } catch (Throwable t) {
+      String msg = "Error upserting bucketType " + btype + " for partition :" 
+ partition;
+      LOG.error(msg, t);
+      throw new HoodieUpsertException(msg, t);
+    }
+  }
+
+  protected Iterator<List<WriteStatus>> handleInsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
+                                                              Partitioner 
partitioner) {
+    return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+  }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String 
fileId,
+                                                  Iterator<HoodieRecord<T>> 
recordItr)
+      throws IOException {
+    // This is needed since sometimes some buckets are never picked in 
getPartition() and end up with 0 records
+    if (!recordItr.hasNext()) {
+      LOG.info("Empty partition with fileId => " + fileId);
+      return Collections.singletonList((List<WriteStatus>) 
Collections.EMPTY_LIST).iterator();
+    }
+    // these are updates
+    HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, 
recordItr);
+    return handleUpdateInternal(upsertHandle, fileId);
+  }
+
+  protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle 
upsertHandle, String fileId)
+      throws IOException {
+    if (upsertHandle.getOldFilePath() == null) {
+      throw new HoodieUpsertException(
+          "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
+    } else {
+      JavaMergeHelper.newInstance().runMerge(table, upsertHandle);
+    }
+
+    // TODO(vc): This needs to be revisited
+    if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
+      LOG.info("Upsert Handle has partition path as null " + 
upsertHandle.getOldFilePath() + ", "
+          + upsertHandle.getWriteStatus());
+    }
+    return 
Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
+  }
+
+  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String 
fileId, Iterator<HoodieRecord<T>> recordItr) {
+    if (table.requireSortedRecords()) {
+      return new HoodieSortedMergeHandle<>(config, instantTime, table, 
recordItr, partitionPath, fileId, taskContextSupplier);
+    } else {
+      return new HoodieMergeHandle<>(config, instantTime, table, recordItr, 
partitionPath, fileId, taskContextSupplier);
+    }
+  }
+
+  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String 
fileId,
+                                              Map<String, HoodieRecord<T>> 
keyToNewRecords,
+                                              HoodieBaseFile 
dataFileToBeMerged) {
+    return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords,
+        partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
+  }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleInsert(String idPfx, 
Iterator<HoodieRecord<T>> recordItr)
+      throws Exception {
+    // This is needed since sometimes some buckets are never picked in 
getPartition() and end up with 0 records
+    if (!recordItr.hasNext()) {
+      LOG.info("Empty partition");
+      return Collections.singletonList((List<WriteStatus>) 
Collections.EMPTY_LIST).iterator();
+    }
+    return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, 
table, idPfx,
+        taskContextSupplier, new CreateHandleFactory<>());
+  }
+
+  /**
+   * Provides a partitioner to perform the upsert operation, based on the 
workload profile.
+   */
+  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+    if (profile == null) {
+      throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
+    }
+    return new UpsertPartitioner(profile, context, table, config);
+  }
+
+  /**
+   * Provides a partitioner to perform the insert operation, based on the 
workload profile.
+   */
+  public Partitioner getInsertPartitioner(WorkloadProfile profile) {
+    return getUpsertPartitioner(profile);
+  }
+
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java
new file mode 100644
index 0000000..45cf3d6
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> 
extends BaseJavaCommitActionExecutor<T> {
+
+  private List<HoodieRecord<T>> inputRecords;
+
+  public JavaInsertCommitActionExecutor(HoodieEngineContext context,
+                                         HoodieWriteConfig config,
+                                         HoodieTable table,
+                                         String instantTime,
+                                         List<HoodieRecord<T>> inputRecords) {
+    super(context, config, table, instantTime, WriteOperationType.INSERT);
+    this.inputRecords = inputRecords;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return JavaWriteHelper.newInstance().write(instantTime, inputRecords, 
context, table,
+        config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java
new file mode 100644
index 0000000..349cf69
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaInsertPreppedCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends BaseJavaCommitActionExecutor<T> {
+
+  private final List<HoodieRecord<T>> preppedRecords;
+
+  public JavaInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
+                                               HoodieWriteConfig config, 
HoodieTable table,
+                                               String instantTime, 
List<HoodieRecord<T>> preppedRecords) {
+    super(context, config, table, instantTime, 
WriteOperationType.INSERT_PREPPED);
+    this.preppedRecords = preppedRecords;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return super.execute(preppedRecords);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
new file mode 100644
index 0000000..bd596be
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class JavaMergeHelper<T extends HoodieRecordPayload> extends 
AbstractMergeHelper<T, List<HoodieRecord<T>>,
+    List<HoodieKey>, List<WriteStatus>> {
+
+  private JavaMergeHelper() {
+  }
+
+  private static class MergeHelperHolder {
+    private static final JavaMergeHelper JAVA_MERGE_HELPER = new 
JavaMergeHelper();
+  }
+
+  public static JavaMergeHelper newInstance() {
+    return JavaMergeHelper.MergeHelperHolder.JAVA_MERGE_HELPER;
+  }
+
+  @Override
+  public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
+                       HoodieMergeHandle<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
+    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
+    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+    HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> mergeHandle = upsertHandle;
+    HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
+
+    final GenericDatumWriter<GenericRecord> gWriter;
+    final GenericDatumReader<GenericRecord> gReader;
+    Schema readSchema;
+    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
+      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
+      gWriter = new GenericDatumWriter<>(readSchema);
+      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetafields());
+    } else {
+      gReader = null;
+      gWriter = null;
+      readSchema = mergeHandle.getWriterSchemaWithMetafields();
+    }
+
+    BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    try {
+      final Iterator<GenericRecord> readerIterator;
+      if (baseFile.getBootstrapBaseFile().isPresent()) {
+        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+      } else {
+        readerIterator = reader.getRecordIterator(readSchema);
+      }
+
+      ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
+      ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
+      wrapper = new 
BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new 
IteratorBasedQueueProducer<>(readerIterator),
+          Option.of(new UpdateHandler(mergeHandle)), record -> {
+        if (!externalSchemaTransformation) {
+          return record;
+        }
+        return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, 
decoderCache, (GenericRecord) record);
+      });
+      wrapper.execute();
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+      mergeHandle.close();
+      if (null != wrapper) {
+        wrapper.shutdownNow();
+      }
+    }
+  }
+
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java
new file mode 100644
index 0000000..cdb2527
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>> 
extends BaseJavaCommitActionExecutor<T> {
+
+  private List<HoodieRecord<T>> inputRecords;
+
+  public JavaUpsertCommitActionExecutor(HoodieEngineContext context,
+                                         HoodieWriteConfig config,
+                                         HoodieTable table,
+                                         String instantTime,
+                                         List<HoodieRecord<T>> inputRecords) {
+    super(context, config, table, instantTime, WriteOperationType.UPSERT);
+    this.inputRecords = inputRecords;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return JavaWriteHelper.newInstance().write(instantTime, inputRecords, 
context, table,
+        config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(), this, true);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java
new file mode 100644
index 0000000..8eea5b5
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaUpsertPreppedCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends BaseJavaCommitActionExecutor<T> {
+
+  private final List<HoodieRecord<T>> preppedRecords;
+
+  public JavaUpsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
+                                               HoodieWriteConfig config, 
HoodieTable table,
+                                               String instantTime, 
List<HoodieRecord<T>> preppedRecords) {
+    super(context, config, table, instantTime, 
WriteOperationType.UPSERT_PREPPED);
+    this.preppedRecords = preppedRecords;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return super.execute(preppedRecords);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
new file mode 100644
index 0000000..ec7ea16
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.index.HoodieIndex;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends 
AbstractWriteHelper<T, List<HoodieRecord<T>>,
+    List<HoodieKey>, List<WriteStatus>, R> {
+
+  private JavaWriteHelper() {
+  }
+
+  private static class WriteHelperHolder {
+    private static final JavaWriteHelper JAVA_WRITE_HELPER = new 
JavaWriteHelper();
+  }
+
+  public static JavaWriteHelper newInstance() {
+    return WriteHelperHolder.JAVA_WRITE_HELPER;
+  }
+
+  @Override
+  public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> 
records,
+                                                  HoodieIndex<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
+                                                  int parallelism) {
+    boolean isIndexingGlobal = index.isGlobal();
+    Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = 
records.stream().map(record -> {
+      HoodieKey hoodieKey = record.getKey();
+      // If index used is global, then records are expected to differ in their 
partitionPath
+      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
+      return Pair.of(key, record);
+    }).collect(Collectors.groupingBy(Pair::getLeft));
+
+    return keyedRecords.values().stream().map(x -> 
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
+      @SuppressWarnings("unchecked")
+      T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+      // we cannot allow the user to change the key or partitionPath, since 
that will affect
+      // everything
+      // so pick it from one of the records.
+      return new HoodieRecord<T>(rec1.getKey(), reducedData);
+    }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
new file mode 100644
index 0000000..4b0fcdf
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.NumericUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Packs incoming records to be upserted, into buckets.
+ */
+public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements 
Partitioner  {
+
+  private static final Logger LOG = 
LogManager.getLogger(UpsertPartitioner.class);
+
+  /**
+   * List of all small files to be corrected.
+   */
+  protected List<SmallFile> smallFiles = new ArrayList<>();
+  /**
+   * Total number of RDD partitions, is determined by total buckets we want to 
pack the incoming workload into.
+   */
+  private int totalBuckets = 0;
+  /**
+   * Stat for the current workload. Helps in determining inserts, upserts etc.
+   */
+  private WorkloadProfile profile;
+  /**
+   * Helps decide which bucket an incoming update should go to.
+   */
+  private HashMap<String, Integer> updateLocationToBucket;
+  /**
+   * Helps us pack inserts into 1 or more buckets depending on number of 
incoming records.
+   */
+  private HashMap<String, List<InsertBucketCumulativeWeightPair>> 
partitionPathToInsertBucketInfos;
+  /**
+   * Remembers what type each bucket is for later.
+   */
+  private HashMap<Integer, BucketInfo> bucketInfoMap;
+
+  protected final HoodieTable table;
+
+  protected final HoodieWriteConfig config;
+
+  public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext 
context, HoodieTable table,
+                           HoodieWriteConfig config) {
+    updateLocationToBucket = new HashMap<>();
+    partitionPathToInsertBucketInfos = new HashMap<>();
+    bucketInfoMap = new HashMap<>();
+    this.profile = profile;
+    this.table = table;
+    this.config = config;
+    assignUpdates(profile);
+    assignInserts(profile, context);
+
+    LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + 
bucketInfoMap + ", \n"
+        + "Partition to insert buckets => " + partitionPathToInsertBucketInfos 
+ ", \n"
+        + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
+  }
+
+  private void assignUpdates(WorkloadProfile profile) {
+    // each update location gets a partition
+    Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = 
profile.getPartitionPathStatMap().entrySet();
+    for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) 
{
+      for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
+          partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
+        addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+      }
+    }
+  }
+
+  private int addUpdateBucket(String partitionPath, String fileIdHint) {
+    int bucket = totalBuckets;
+    updateLocationToBucket.put(fileIdHint, bucket);
+    BucketInfo bucketInfo = new BucketInfo();
+    bucketInfo.bucketType = BucketType.UPDATE;
+    bucketInfo.fileIdPrefix = fileIdHint;
+    bucketInfo.partitionPath = partitionPath;
+    bucketInfoMap.put(totalBuckets, bucketInfo);
+    totalBuckets++;
+    return bucket;
+  }
+
+  private void assignInserts(WorkloadProfile profile, HoodieEngineContext 
context) {
+    // for new inserts, compute buckets depending on how many records we have 
for each partition
+    Set<String> partitionPaths = profile.getPartitionPaths();
+    long averageRecordSize =
+        
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+            config);
+    LOG.info("AvgRecordSize => " + averageRecordSize);
+
+    Map<String, List<SmallFile>> partitionSmallFilesMap =
+        getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), 
context);
+
+    for (String partitionPath : partitionPaths) {
+      WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+      if (pStat.getNumInserts() > 0) {
+
+        List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
+        this.smallFiles.addAll(smallFiles);
+
+        LOG.info("For partitionPath : " + partitionPath + " Small Files => " + 
smallFiles);
+
+        long totalUnassignedInserts = pStat.getNumInserts();
+        List<Integer> bucketNumbers = new ArrayList<>();
+        List<Long> recordsPerBucket = new ArrayList<>();
+
+        // first try packing this into one of the smallFiles
+        for (SmallFile smallFile : smallFiles) {
+          long recordsToAppend = Math.min((config.getParquetMaxFileSize() - 
smallFile.sizeBytes) / averageRecordSize,
+              totalUnassignedInserts);
+          if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
+            // create a new bucket or re-use an existing bucket
+            int bucket;
+            if 
(updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
+              bucket = 
updateLocationToBucket.get(smallFile.location.getFileId());
+              LOG.info("Assigning " + recordsToAppend + " inserts to existing 
update bucket " + bucket);
+            } else {
+              bucket = addUpdateBucket(partitionPath, 
smallFile.location.getFileId());
+              LOG.info("Assigning " + recordsToAppend + " inserts to new 
update bucket " + bucket);
+            }
+            bucketNumbers.add(bucket);
+            recordsPerBucket.add(recordsToAppend);
+            totalUnassignedInserts -= recordsToAppend;
+          }
+        }
+
+        // if we have anything more, create new insert buckets, like normal
+        if (totalUnassignedInserts > 0) {
+          long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
+          if (config.shouldAutoTuneInsertSplits()) {
+            insertRecordsPerBucket = config.getParquetMaxFileSize() / 
averageRecordSize;
+          }
+
+          int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / 
insertRecordsPerBucket);
+          LOG.info("After small file assignment: unassignedInserts => " + 
totalUnassignedInserts
+              + ", totalInsertBuckets => " + insertBuckets + ", 
recordsPerBucket => " + insertRecordsPerBucket);
+          for (int b = 0; b < insertBuckets; b++) {
+            bucketNumbers.add(totalBuckets);
+            recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
+            BucketInfo bucketInfo = new BucketInfo();
+            bucketInfo.bucketType = BucketType.INSERT;
+            bucketInfo.partitionPath = partitionPath;
+            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            bucketInfoMap.put(totalBuckets, bucketInfo);
+            totalBuckets++;
+          }
+        }
+
+        // Go over all such buckets, and assign weights as per amount of 
incoming inserts.
+        List<InsertBucketCumulativeWeightPair> insertBuckets = new 
ArrayList<>();
+        double curentCumulativeWeight = 0;
+        for (int i = 0; i < bucketNumbers.size(); i++) {
+          InsertBucket bkt = new InsertBucket();
+          bkt.bucketNumber = bucketNumbers.get(i);
+          bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
+          curentCumulativeWeight += bkt.weight;
+          insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, 
curentCumulativeWeight));
+        }
+        LOG.info("Total insert buckets for partition path " + partitionPath + 
" => " + insertBuckets);
+        partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
+      }
+    }
+  }
+
+  private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> 
partitionPaths, HoodieEngineContext context) {
+    Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+    if (partitionPaths != null && partitionPaths.size() > 0) {
+      context.setJobStatus(this.getClass().getSimpleName(), "Getting small 
files from partitions");
+      partitionSmallFilesMap = context.mapToPair(partitionPaths,
+          partitionPath -> new ImmutablePair<>(partitionPath, 
getSmallFiles(partitionPath)), 0);
+    }
+    return partitionSmallFilesMap;
+  }
+
+  /**
+   * Returns a list of small files in the given partition path.
+   */
+  protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+    // smallFiles only for partitionPath
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    HoodieTimeline commitTimeline = 
table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+
+    if (!commitTimeline.empty()) { // if we have some commits
+      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+      List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
+          .getLatestBaseFilesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp()).collect(Collectors.toList());
+
+      for (HoodieBaseFile file : allFiles) {
+        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
+          String filename = file.getFileName();
+          SmallFile sf = new SmallFile();
+          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+          sf.sizeBytes = file.getFileSize();
+          smallFileLocations.add(sf);
+        }
+      }
+    }
+
+    return smallFileLocations;
+  }
+
+  public BucketInfo getBucketInfo(int bucketNumber) {
+    return bucketInfoMap.get(bucketNumber);
+  }
+
+  public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String 
partitionPath) {
+    return partitionPathToInsertBucketInfos.get(partitionPath);
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return totalBuckets;
+  }
+
+  @Override
+  public int getPartition(Object key) {
+    Pair<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
+        (Pair<HoodieKey, Option<HoodieRecordLocation>>) key;
+    if (keyLocation.getRight().isPresent()) {
+      HoodieRecordLocation location = keyLocation.getRight().get();
+      return updateLocationToBucket.get(location.getFileId());
+    } else {
+      String partitionPath = keyLocation.getLeft().getPartitionPath();
+      List<InsertBucketCumulativeWeightPair> targetBuckets = 
partitionPathToInsertBucketInfos.get(partitionPath);
+      // pick the target bucket to use based on the weights.
+      final long totalInserts = Math.max(1, 
profile.getWorkloadStat(partitionPath).getNumInserts());
+      final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", 
keyLocation.getLeft().getRecordKey());
+      final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / 
totalInserts;
+
+      int index = Collections.binarySearch(targetBuckets, new 
InsertBucketCumulativeWeightPair(new InsertBucket(), r));
+
+      if (index >= 0) {
+        return targetBuckets.get(index).getKey().bucketNumber;
+      }
+
+      if ((-1 * index - 1) < targetBuckets.size()) {
+        return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber;
+      }
+
+      // return first one, by default
+      return targetBuckets.get(0).getKey().bucketNumber;
+    }
+  }
+
+  /**
+   * Obtains the average record size based on records written during previous 
commits. Used for estimating how many
+   * records pack into one file.
+   */
+  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, 
HoodieWriteConfig hoodieWriteConfig) {
+    long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+    long fileSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
+    try {
+      if (!commitTimeline.empty()) {
+        // Go over the reverse ordered commits to get a more recent estimate 
of average record size.
+        Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
+        while (instants.hasNext()) {
+          HoodieInstant instant = instants.next();
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(commitTimeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+          long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+          long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+          if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 
0) {
+            avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / 
totalRecordsWritten);
+            break;
+          }
+        }
+      }
+    } catch (Throwable t) {
+      // make this fail safe.
+      LOG.error("Error trying to compute average bytes/record ", t);
+    }
+    return avgSize;
+  }
+}
diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml
index ba13290..647b1b6 100644
--- a/hudi-examples/pom.xml
+++ b/hudi-examples/pom.xml
@@ -135,6 +135,12 @@
 
     <dependency>
       <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-java-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-spark-client</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git 
a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
 
b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
index 4a9868b..71c6408 100644
--- 
a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
+++ 
b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
@@ -55,7 +55,7 @@ public class HoodieExampleDataGenerator<T extends 
HoodieRecordPayload<T>> {
   public static final String[] DEFAULT_PARTITION_PATHS =
       {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH};
   public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": 
\"triprec\",\"fields\": [ "
-          + "{\"name\": \"ts\",\"type\": \"double\"},{\"name\": \"uuid\", 
\"type\": \"string\"},"
+          + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", 
\"type\": \"string\"},"
           + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": 
\"driver\", \"type\": \"string\"},"
           + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
           + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": 
\"end_lon\", \"type\": \"double\"},"
diff --git 
a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
 
b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
new file mode 100644
index 0000000..31fccfa
--- /dev/null
+++ 
b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.java;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+/**
+ * Simple examples of #{@link HoodieJavaWriteClient}.
+ *
+ * Usage: HoodieWriteClientExample <tablePath> <tableName>
+ * <tablePath> and <tableName> describe root path of hudi and table name
+ * for example, `HoodieWriteClientExample file:///tmp/hoodie/sample-table 
hoodie_rt`
+ */
+public class HoodieJavaWriteClientExample {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieJavaWriteClientExample.class);
+
+  private static String tableType = HoodieTableType.COPY_ON_WRITE.name();
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: HoodieWriteClientExample <tablePath> 
<tableName>");
+      System.exit(1);
+    }
+    String tablePath = args[0];
+    String tableName = args[1];
+
+    // Generator of some records to be loaded in.
+    HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new 
HoodieExampleDataGenerator<>();
+
+    Configuration hadoopConf = new Configuration();
+    // initialize the table, if not done already
+    Path path = new Path(tablePath);
+    FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
+    if (!fs.exists(path)) {
+      HoodieTableMetaClient.initTableType(hadoopConf, tablePath, 
HoodieTableType.valueOf(tableType),
+          tableName, HoodieAvroPayload.class.getName());
+    }
+
+    // Create the write client to write some records in
+    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+        
.withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2)
+        .withDeleteParallelism(2).forTable(tableName)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20,
 30).build()).build();
+    HoodieJavaWriteClient<HoodieAvroPayload> client =
+        new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), 
cfg);
+
+    // inserts
+    String newCommitTime = client.startCommit();
+    LOG.info("Starting commit " + newCommitTime);
+
+    List<HoodieRecord<HoodieAvroPayload>> records = 
dataGen.generateInserts(newCommitTime, 10);
+    List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new 
ArrayList<>(records);
+    List<HoodieRecord<HoodieAvroPayload>> writeRecords =
+        recordsSoFar.stream().map(r -> new 
HoodieRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+    client.upsert(writeRecords, newCommitTime);
+
+    // updates
+    newCommitTime = client.startCommit();
+    LOG.info("Starting commit " + newCommitTime);
+    List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = 
dataGen.generateUpdates(newCommitTime, 2);
+    records.addAll(toBeUpdated);
+    recordsSoFar.addAll(toBeUpdated);
+    writeRecords =
+        recordsSoFar.stream().map(r -> new 
HoodieRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+    client.upsert(writeRecords, newCommitTime);
+
+    client.close();
+  }
+}

Reply via email to