This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ca9bfa2  [HUDI-2332] Add clustering and compaction in Kafka Connect 
Sink (#3857)
ca9bfa2 is described below

commit ca9bfa2a4000575dbaa379c91898786f040a9917
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Nov 23 00:53:28 2021 -0800

    [HUDI-2332] Add clustering and compaction in Kafka Connect Sink (#3857)
    
    * [HUDI-2332] Add clustering and compaction in Kafka Connect Sink
    
    * Disable validation check on instant time for compaction and adjust configs
    
    * Add javadocs
    
    * Add clustering and compaction config
    
    * Fix transaction causing missing records in the target table
    
    * Add debugging logs
    
    * Fix kafka offset sync in participant
    
    * Adjust how clustering and compaction are configured in kafka-connect
    
    * Fix clustering strategy
    
    * Remove irrelevant changes from other published PRs
    
    * Update clustering logic and others
    
    * Update README
    
    * Fix test failures
    
    * Fix indentation
    
    * Fix clustering config
    
    * Add JavaCustomColumnsSortPartitioner and make async compaction enabled by 
default
    
    * Add test for JavaCustomColumnsSortPartitioner
    
    * Add more changes after IDE sync
    
    * Update README with clarification
    
    * Fix clustering logic after rebasing
    
    * Remove unrelated changes
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  48 +++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   3 +-
 .../cluster/BaseClusteringPlanActionExecutor.java  |  34 ++-
 .../compact/ScheduleCompactionActionExecutor.java  |  16 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  |  47 +++-
 .../JavaRecentDaysClusteringPlanStrategy.java}     |  42 ++--
 .../JavaSizeBasedClusteringPlanStrategy.java       | 131 +++++++++++
 .../run/strategy/JavaExecutionStrategy.java        | 242 +++++++++++++++++++++
 .../strategy/JavaSortAndSizeExecutionStrategy.java |  70 ++++++
 .../JavaCustomColumnsSortPartitioner.java          |  62 ++++++
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  79 ++++++-
 .../hudi/table/HoodieJavaMergeOnReadTable.java     |  22 ++
 .../org/apache/hudi/table/HoodieJavaTable.java     |   9 +
 .../cluster/JavaClusteringPlanActionExecutor.java  |  43 ++++
 .../JavaExecuteClusteringCommitActionExecutor.java | 123 +++++++++++
 .../commit/BaseJavaCommitActionExecutor.java       |   3 +-
 .../HoodieJavaMergeOnReadTableCompactor.java       |  56 +++++
 .../TestJavaBulkInsertInternalPartitioner.java     |  98 +++++++++
 .../SparkRecentDaysClusteringPlanStrategy.java     |   2 +
 .../RDDCustomColumnsSortPartitioner.java           |  30 +--
 .../cluster/SparkClusteringPlanActionExecutor.java |  37 ----
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  58 ++++-
 hudi-kafka-connect/README.md                       | 132 ++++++++++-
 hudi-kafka-connect/demo/config-sink.json           |   1 +
 .../hudi/connect/writers/KafkaConnectConfigs.java  |   9 +
 .../writers/KafkaConnectTransactionServices.java   |  29 ++-
 .../org/apache/hudi/utilities/HoodieCompactor.java |   2 +-
 27 files changed, 1316 insertions(+), 112 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 6c4a6d8..4476e4f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -22,7 +22,9 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
 
 import java.io.File;
 import java.io.FileReader;
@@ -41,6 +43,14 @@ public class HoodieClusteringConfig extends HoodieConfig {
 
   // Any strategy specific params can be saved with this prefix
   public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = 
"hoodie.clustering.plan.strategy.";
+  public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
+  public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
+      
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
+  public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
+  public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
+      
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
 
   // Any Space-filling curves optimize(z-order/hilbert) params can be saved 
with this prefix
   public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = 
"hoodie.layout.optimize.";
@@ -59,7 +69,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = 
ConfigProperty
       .key("hoodie.clustering.plan.strategy.class")
-      
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy")
+      .defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY)
       .sinceVersion("0.7.0")
       .withDocumentation("Config to provide a strategy class (subclass of 
ClusteringPlanStrategy) to create clustering plan "
           + "i.e select what file groups are being clustered. Default 
strategy, looks at the clustering small file size limit (determined by "
@@ -67,7 +77,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> EXECUTION_STRATEGY_CLASS_NAME = 
ConfigProperty
       .key("hoodie.clustering.execution.strategy.class")
-      
.defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
+      .defaultValue(SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY)
       .sinceVersion("0.7.0")
       .withDocumentation("Config to provide a strategy class (subclass of 
RunClusteringStrategy) to define how the "
           + " clustering plan is executed. By default, we sort the file groups 
in th plan by the specified columns, while "
@@ -336,6 +346,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
   public static class Builder {
 
     private final HoodieClusteringConfig clusteringConfig = new 
HoodieClusteringConfig();
+    private EngineType engineType = EngineType.SPARK;
+
+    public Builder withEngineType(EngineType engineType) {
+      this.engineType = engineType;
+      return this;
+    }
 
     public Builder fromFile(File propertiesFile) throws IOException {
       try (FileReader reader = new FileReader(propertiesFile)) {
@@ -455,9 +471,37 @@ public class HoodieClusteringConfig extends HoodieConfig {
     }
 
     public HoodieClusteringConfig build() {
+      clusteringConfig.setDefaultValue(
+          PLAN_STRATEGY_CLASS_NAME, 
getDefaultPlanStrategyClassName(engineType));
+      clusteringConfig.setDefaultValue(
+          EXECUTION_STRATEGY_CLASS_NAME, 
getDefaultExecutionStrategyClassName(engineType));
       clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
       return clusteringConfig;
     }
+
+    private String getDefaultPlanStrategyClassName(EngineType engineType) {
+      switch (engineType) {
+        case SPARK:
+          return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+        case FLINK:
+        case JAVA:
+          return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+        default:
+          throw new HoodieNotSupportedException("Unsupported engine " + 
engineType);
+      }
+    }
+
+    private String getDefaultExecutionStrategyClassName(EngineType engineType) 
{
+      switch (engineType) {
+        case SPARK:
+          return SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
+        case FLINK:
+        case JAVA:
+          return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
+        default:
+          throw new HoodieNotSupportedException("Unsupported engine " + 
engineType);
+      }
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d496610..17386e9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2182,7 +2182,8 @@ public class HoodieWriteConfig extends HoodieConfig {
       writeConfig.setDefaultOnCondition(!isCompactionConfigSet,
           
HoodieCompactionConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
       writeConfig.setDefaultOnCondition(!isClusteringConfigSet,
-          
HoodieClusteringConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+          HoodieClusteringConfig.newBuilder().withEngineType(engineType)
+              .fromProperties(writeConfig.getProps()).build());
       writeConfig.setDefaultOnCondition(!isMetricsConfigSet, 
HoodieMetricsConfig.newBuilder().fromProperties(
           writeConfig.getProps()).build());
       writeConfig.setDefaultOnCondition(!isBootstrapConfigSet,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
index 97407e3..8071bfb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
@@ -27,10 +27,15 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -38,6 +43,8 @@ import java.util.Map;
 
 public abstract class BaseClusteringPlanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, 
Option<HoodieClusteringPlan>> {
 
+  private static final Logger LOG = 
LogManager.getLogger(BaseClusteringPlanActionExecutor.class);
+
   private final Option<Map<String, String>> extraMetadata;
 
   public BaseClusteringPlanActionExecutor(HoodieEngineContext context,
@@ -49,7 +56,32 @@ public abstract class BaseClusteringPlanActionExecutor<T 
extends HoodieRecordPay
     this.extraMetadata = extraMetadata;
   }
 
-  protected abstract Option<HoodieClusteringPlan> createClusteringPlan();
+  protected Option<HoodieClusteringPlan> createClusteringPlan() {
+    LOG.info("Checking if clustering needs to be run on " + 
config.getBasePath());
+    Option<HoodieInstant> lastClusteringInstant = 
table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();
+
+    int commitsSinceLastClustering = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+        
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
 Integer.MAX_VALUE)
+        .countInstants();
+    if (config.inlineClusteringEnabled() && 
config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
+      LOG.info("Not scheduling inline clustering as only " + 
commitsSinceLastClustering
+          + " commits was found since last clustering " + 
lastClusteringInstant + ". Waiting for "
+          + config.getInlineClusterMaxCommits());
+      return Option.empty();
+    }
+
+    if (config.isAsyncClusteringEnabled() && 
config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
+      LOG.info("Not scheduling async clustering as only " + 
commitsSinceLastClustering
+          + " commits was found since last clustering " + 
lastClusteringInstant + ". Waiting for "
+          + config.getAsyncClusterMaxCommits());
+      return Option.empty();
+    }
+
+    LOG.info("Generating clustering plan for table " + config.getBasePath());
+    ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
+        ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), 
table, context, config);
+    return strategy.generateClusteringPlan();
+  }
 
   @Override
   public Option<HoodieClusteringPlan> execute() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 87c5a7c..5adaf5a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -68,12 +69,15 @@ public class ScheduleCompactionActionExecutor<T extends 
HoodieRecordPayload, I,
   public Option<HoodieCompactionPlan> execute() {
     if 
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
         && !config.getFailedWritesCleanPolicy().isLazy()) {
-      // if there are inflight writes, their instantTime must not be less than 
that of compaction instant time
-      
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
-          .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
-              
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), 
HoodieTimeline.GREATER_THAN, instantTime),
-              "Earliest write inflight instant time must be later than 
compaction time. Earliest :" + earliestInflight
-                  + ", Compaction scheduled at " + instantTime));
+      // TODO(yihua): this validation is removed for Java client used by 
kafka-connect.  Need to revisit this.
+      if (config.getEngineType() != EngineType.JAVA) {
+        // if there are inflight writes, their instantTime must not be less 
than that of compaction instant time
+        
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
+            .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
+                
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), 
HoodieTimeline.GREATER_THAN, instantTime),
+                "Earliest write inflight instant time must be later than 
compaction time. Earliest :" + earliestInflight
+                    + ", Compaction scheduled at " + instantTime));
+      }
       // Committed and pending compaction instants should have strictly lower 
timestamps
       List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
           
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index ed6b9e6..1df71e8 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -20,7 +20,6 @@ package org.apache.hudi.config;
 
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.config.HoodieWriteConfig.Builder;
-
 import org.apache.hudi.index.HoodieIndex;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -81,6 +80,52 @@ public class TestHoodieWriteConfig {
     assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType());
   }
 
+  @Test
+  public void testDefaultClusteringPlanStrategyClassAccordingToEngineType() {
+    // Default (as Spark)
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp").build();
+    assertEquals(
+        HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+        writeConfig.getClusteringPlanStrategyClass());
+
+    // Spark
+    writeConfig = 
HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
+    assertEquals(
+        HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+        writeConfig.getClusteringPlanStrategyClass());
+
+    // Flink and Java
+    for (EngineType engineType : new EngineType[] {EngineType.FLINK, 
EngineType.JAVA}) {
+      writeConfig = 
HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
+      assertEquals(
+              HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+              writeConfig.getClusteringPlanStrategyClass());
+    }
+  }
+
+  @Test
+  public void 
testDefaultClusteringExecutionStrategyClassAccordingToEngineType() {
+    // Default (as Spark)
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp").build();
+    assertEquals(
+        HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
+        writeConfig.getClusteringExecutionStrategyClass());
+
+    // Spark
+    writeConfig = 
HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
+    assertEquals(
+        HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
+        writeConfig.getClusteringExecutionStrategyClass());
+
+    // Flink and Java
+    for (EngineType engineType : new EngineType[] {EngineType.FLINK, 
EngineType.JAVA}) {
+      writeConfig = 
HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
+      assertEquals(
+          HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY,
+          writeConfig.getClusteringExecutionStrategyClass());
+    }
+  }
+
   private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String> 
params) throws IOException {
     Properties properties = new Properties();
     properties.putAll(params);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
similarity index 52%
copy from 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
copy to 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
index 5c13277..6d9b2ee 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
@@ -7,22 +7,24 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.client.clustering.plan.strategy;
 
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
+import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
+import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -31,24 +33,26 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * Clustering Strategy that only looks at latest 
'daybased.lookback.partitions' partitions.
+ * Clustering Strategy that only looks at latest 
'daybased.lookback.partitions' partitions
+ * for Java engine.
  */
-public class SparkRecentDaysClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
-    extends SparkSizeBasedClusteringPlanStrategy<T> {
-  private static final Logger LOG = 
LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class);
+public class JavaRecentDaysClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends JavaSizeBasedClusteringPlanStrategy<T> {
+  private static final Logger LOG = 
LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class);
 
-  public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> 
table,
-                                               HoodieSparkEngineContext 
engineContext,
-                                               HoodieWriteConfig writeConfig) {
+  public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T> 
table,
+                                              HoodieJavaEngineContext 
engineContext,
+                                              HoodieWriteConfig writeConfig) {
     super(table, engineContext, writeConfig);
   }
 
-  public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> 
table,
-                                               HoodieSparkEngineContext 
engineContext,
-                                               HoodieWriteConfig writeConfig) {
+  public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T> 
table,
+                                              HoodieJavaEngineContext 
engineContext,
+                                              HoodieWriteConfig writeConfig) {
     super(table, engineContext, writeConfig);
   }
 
+  @Override
   protected List<String> filterPartitionPaths(List<String> partitionPaths) {
     int targetPartitionsForClustering = 
getWriteConfig().getTargetPartitionsForClustering();
     int skipPartitionsFromLatestForClustering = 
getWriteConfig().getSkipPartitionsFromLatestForClustering();
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 0000000..9052f03
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
+import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
+import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy for Java engine based on following.
+ * 1) Creates clustering groups based on max size allowed per group.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering 
plan.
+ */
+public class JavaSizeBasedClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+  private static final Logger LOG = 
LogManager.getLogger(JavaSizeBasedClusteringPlanStrategy.class);
+
+  public JavaSizeBasedClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T> 
table,
+                                             HoodieJavaEngineContext 
engineContext,
+                                             HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public JavaSizeBasedClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T> 
table,
+                                             HoodieJavaEngineContext 
engineContext,
+                                             HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
+    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+    List<FileSlice> currentGroup = new ArrayList<>();
+    long totalSizeSoFar = 0;
+    HoodieWriteConfig writeConfig = getWriteConfig();
+    for (FileSlice currentSlice : fileSlices) {
+      // assume each filegroup size is ~= parquet.max.file.size
+      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
+      // check if max size is reached and create new group, if needed.
+      if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && 
!currentGroup.isEmpty()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
+                + writeConfig.getClusteringMaxBytesInGroup() + " num input 
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+        currentGroup = new ArrayList<>();
+        totalSizeSoFar = 0;
+      }
+      currentGroup.add(currentSlice);
+      // totalSizeSoFar could be 0 when new group was created in the previous 
conditional block.
+      // reset to the size of current slice, otherwise the number of output 
file group will become 0 even though current slice is present.
+      if (totalSizeSoFar == 0) {
+        totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
+      }
+    }
+    if (!currentGroup.isEmpty()) {
+      int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+      LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
+              + writeConfig.getClusteringMaxBytesInGroup() + " num input 
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+      fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+    }
+    
+    return fileSliceGroups.stream().map(fileSliceGroup -> 
HoodieClusteringGroup.newBuilder()
+        .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+        .setNumOutputFileGroups(fileSliceGroup.getRight())
+        .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+        .build());
+  }
+
+  @Override
+  protected Map<String, String> getStrategyParams() {
+    Map<String, String> params = new HashMap<>();
+    if 
(!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
+      params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), 
getWriteConfig().getClusteringSortColumns());
+    }
+    return params;
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    return partitionPaths;
+  }
+
+  @Override
+  protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String 
partition) {
+    return super.getFileSlicesEligibleForClustering(partition)
+        // Only files that have basefile size smaller than small file size are 
eligible.
+        .filter(slice -> 
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < 
getWriteConfig().getClusteringSmallFileLimit());
+  }
+
+  private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) 
{
+    return (int) Math.ceil(groupSize / (double) targetFileSize);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
new file mode 100644
index 0000000..c830925
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.JavaTaskContextSupplier;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering strategy for Java engine.
+ */
+public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
+    extends ClusteringExecutionStrategy<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(JavaExecutionStrategy.class);
+
+  public JavaExecutionStrategy(
+      HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig 
writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> performClustering(
+      HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
+    // execute clustering for each group and collect WriteStatus
+    List<WriteStatus> writeStatusList = new ArrayList<>();
+    clusteringPlan.getInputGroups().forEach(
+        inputGroup -> writeStatusList.addAll(runClusteringForGroup(
+            inputGroup, clusteringPlan.getStrategy().getStrategyParams(),
+            
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
+            instantTime)));
+    HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new 
HoodieWriteMetadata<>();
+    writeMetadata.setWriteStatuses(writeStatusList);
+    return writeMetadata;
+  }
+
+  /**
+   * Execute clustering to write inputRecords into new files as defined by 
rules in strategy parameters.
+   * The number of new file groups created is bounded by numOutputGroups.
+   * Note that commit is not done as part of strategy. commit is callers 
responsibility.
+   *
+   * @param inputRecords           List of {@link HoodieRecord}.
+   * @param numOutputGroups        Number of output file groups.
+   * @param instantTime            Clustering (replace commit) instant time.
+   * @param strategyParams         Strategy parameters containing columns to 
sort the data by when clustering.
+   * @param schema                 Schema of the data including metadata 
fields.
+   * @param fileGroupIdList        File group id corresponding to each out 
group.
+   * @param preserveHoodieMetadata Whether to preserve commit metadata while 
clustering.
+   * @return List of {@link WriteStatus}.
+   */
+  public abstract List<WriteStatus> performClusteringWithRecordList(
+      final List<HoodieRecord<T>> inputRecords, final int numOutputGroups, 
final String instantTime,
+      final Map<String, String> strategyParams, final Schema schema,
+      final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata);
+
+  /**
+   * Create {@link BulkInsertPartitioner} based on strategy params.
+   *
+   * @param strategyParams Strategy parameters containing columns to sort the 
data by when clustering.
+   * @param schema         Schema of the data including metadata fields.
+   * @return empty for now.
+   */
+  protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, 
String> strategyParams, Schema schema) {
+    if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
+      return Option.of(new JavaCustomColumnsSortPartitioner(
+          strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
+          HoodieAvroUtils.addMetadataFields(schema)));
+    } else {
+      return Option.empty();
+    }
+  }
+
+  /**
+   * Executes clustering for the group.
+   */
+  private List<WriteStatus> runClusteringForGroup(
+      HoodieClusteringGroup clusteringGroup, Map<String, String> 
strategyParams,
+      boolean preserveHoodieMetadata, String instantTime) {
+    List<HoodieRecord<T>> inputRecords = readRecordsForGroup(clusteringGroup, 
instantTime);
+    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
+    List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
+        .map(info -> new HoodieFileGroupId(info.getPartitionPath(), 
info.getFileId()))
+        .collect(Collectors.toList());
+    return performClusteringWithRecordList(inputRecords, 
clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, 
readerSchema, inputFileIds, preserveHoodieMetadata);
+  }
+
+  /**
+   * Get a list of all records for the group. This includes all records from 
file slice
+   * (Apply updates from log files, if any).
+   */
+  private List<HoodieRecord<T>> readRecordsForGroup(HoodieClusteringGroup 
clusteringGroup, String instantTime) {
+    List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+    if (hasLogFiles) {
+      // if there are log files, we read all records into memory for a file 
group and apply updates.
+      return readRecordsForGroupWithLogs(clusteringOps, instantTime);
+    } else {
+      // We want to optimize reading records for case there are no log files.
+      return readRecordsForGroupBaseFiles(clusteringOps);
+    }
+  }
+
+  /**
+   * Read records from baseFiles and apply updates.
+   */
+  private List<HoodieRecord<T>> 
readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps,
+                                                            String 
instantTime) {
+    HoodieWriteConfig config = getWriteConfig();
+    HoodieTable table = getHoodieTable();
+    List<HoodieRecord<T>> records = new ArrayList<>();
+
+    clusteringOps.forEach(clusteringOp -> {
+      long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
JavaTaskContextSupplier(), config);
+      LOG.info("MaxMemoryPerCompaction run as part of clustering => " + 
maxMemoryPerCompaction);
+      try {
+        Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
+        HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+            .withFileSystem(table.getMetaClient().getFs())
+            .withBasePath(table.getMetaClient().getBasePath())
+            .withLogFilePaths(clusteringOp.getDeltaFilePaths())
+            .withReaderSchema(readerSchema)
+            .withLatestInstantTime(instantTime)
+            .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+            .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
+            .withReverseReader(config.getCompactionReverseLogReadEnabled())
+            .withBufferSize(config.getMaxDFSStreamBufferSize())
+            .withSpillableMapBasePath(config.getSpillableMapBasePath())
+            .withPartition(clusteringOp.getPartitionPath())
+            .build();
+
+        Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+            ? Option.empty()
+            : 
Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new 
Path(clusteringOp.getDataFilePath())));
+        HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+        Iterator<HoodieRecord<T>> fileSliceReader = 
getFileSliceReader(baseFileReader, scanner, readerSchema,
+            tableConfig.getPayloadClass(),
+            tableConfig.getPreCombineField(),
+            tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+                tableConfig.getPartitionFieldProp())));
+        fileSliceReader.forEachRemaining(records::add);
+      } catch (IOException e) {
+        throw new HoodieClusteringException("Error reading input data for " + 
clusteringOp.getDataFilePath()
+            + " and " + clusteringOp.getDeltaFilePaths(), e);
+      }
+    });
+    return records;
+  }
+
+  /**
+   * Read records from baseFiles.
+   */
+  private List<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
+    List<HoodieRecord<T>> records = new ArrayList<>();
+    clusteringOps.forEach(clusteringOp -> {
+      try {
+        Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
+        HoodieFileReader<IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new 
Path(clusteringOp.getDataFilePath()));
+        Iterator<IndexedRecord> recordIterator = 
baseFileReader.getRecordIterator(readerSchema);
+        recordIterator.forEachRemaining(record -> 
records.add(transform(record)));
+      } catch (IOException e) {
+        throw new HoodieClusteringException("Error reading input data for " + 
clusteringOp.getDataFilePath()
+            + " and " + clusteringOp.getDeltaFilePaths(), e);
+      }
+    });
+    return records;
+  }
+
+  /**
+   * Transform IndexedRecord into HoodieRecord.
+   */
+  private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
+    GenericRecord record = (GenericRecord) indexedRecord;
+    Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
+    String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, 
keyGeneratorOpt);
+    String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, 
keyGeneratorOpt);
+    HoodieKey hoodieKey = new HoodieKey(key, partition);
+
+    HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
+    HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
+    return hoodieRecord;
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
new file mode 100644
index 0000000..a33af7c
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Java execution engine.
+ * 2) Uses bulk_insert to write data into new files.
+ */
+public class JavaSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
+    extends JavaExecutionStrategy<T> {
+  private static final Logger LOG = 
LogManager.getLogger(JavaSortAndSizeExecutionStrategy.class);
+
+  public JavaSortAndSizeExecutionStrategy(HoodieTable table,
+                                          HoodieEngineContext engineContext,
+                                          HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  public List<WriteStatus> performClusteringWithRecordList(
+      final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
+      final String instantTime, final Map<String, String> strategyParams, 
final Schema schema,
+      final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata) {
+    LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups 
+ " commit:" + instantTime);
+    Properties props = getWriteConfig().getProps();
+    props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 
String.valueOf(numOutputGroups));
+    // We are calling another action executor - disable auto commit. Strategy 
is only expected to write data in new files.
+    props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), 
Boolean.FALSE.toString());
+    props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+    HoodieWriteConfig newConfig = 
HoodieWriteConfig.newBuilder().withProps(props).build();
+    return (List<WriteStatus>) 
JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(), newConfig,
+        false, getPartitioner(strategyParams, schema), true, numOutputGroups, 
new CreateHandleFactory(preserveHoodieMetadata));
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
new file mode 100644
index 0000000..bb7cd5e
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A partitioner that does sorting based on specified column values for Java 
client.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
+    implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
+
+  private final String[] sortColumnNames;
+  private final Schema schema;
+
+  public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema) 
{
+    this.sortColumnNames = columnNames;
+    this.schema = schema;
+  }
+
+  @Override
+  public List<HoodieRecord<T>> repartitionRecords(
+      List<HoodieRecord<T>> records, int outputSparkPartitions) {
+    return records.stream().sorted((o1, o2) -> {
+      Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, 
sortColumnNames, schema);
+      Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, 
sortColumnNames, schema);
+      return values1.toString().compareTo(values2.toString());
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return true;
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 4107adb..cead7aa 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,17 +40,24 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieSortedMergeHandle;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
-import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
+import org.apache.hudi.table.action.cluster.JavaClusteringPlanActionExecutor;
+import 
org.apache.hudi.table.action.cluster.JavaExecuteClusteringCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaMergeHelper;
 import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
@@ -57,10 +65,20 @@ import 
org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends 
HoodieJavaTable<T> {
+public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
+    extends HoodieJavaTable<T> implements HoodieCompactionHandler<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieJavaCopyOnWriteTable.class);
+
   protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
                                        HoodieEngineContext context,
                                        HoodieTableMetaClient metaClient) {
@@ -160,23 +178,23 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload> extends H
   public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context,
                                                          String instantTime,
                                                          Option<Map<String, 
String>> extraMetadata) {
-    throw new HoodieNotSupportedException("ScheduleCompaction is not supported 
yet");
+    throw new HoodieNotSupportedException("ScheduleCompaction is not supported 
on a CopyOnWrite table");
   }
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext 
context,
                                                         String 
compactionInstantTime) {
-    throw new HoodieNotSupportedException("Compact is not supported yet");
+    throw new HoodieNotSupportedException("Compaction is not supported on a 
CopyOnWrite table");
   }
 
   @Override
   public Option<HoodieClusteringPlan> scheduleClustering(final 
HoodieEngineContext context, final String instantTime, final Option<Map<String, 
String>> extraMetadata) {
-    throw new HoodieNotSupportedException("Clustering is not supported yet");
+    return new JavaClusteringPlanActionExecutor<>(context, config, this, 
instantTime, extraMetadata).execute();
   }
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> cluster(final 
HoodieEngineContext context, final String clusteringInstantTime) {
-    throw new HoodieNotSupportedException("Clustering is not supported yet");
+    return new JavaExecuteClusteringCommitActionExecutor<>(context, config, 
this, clusteringInstantTime).execute();
   }
 
   @Override
@@ -235,4 +253,53 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload> extends H
     return new CopyOnWriteRestoreActionExecutor(
         context, config, this, restoreInstantTime, instantToRestore).execute();
   }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleUpdate(
+      String instantTime, String partitionPath, String fileId,
+      Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile)
+      throws IOException {
+    // these are updates
+    HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
+    return handleUpdateInternal(upsertHandle, instantTime, fileId);
+  }
+
+  protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String 
instantTime,
+                                                             String fileId) 
throws IOException {
+    if (upsertHandle.getOldFilePath() == null) {
+      throw new HoodieUpsertException(
+          "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
+    } else {
+      JavaMergeHelper.newInstance().runMerge(this, upsertHandle);
+    }
+
+    // TODO(yihua): This needs to be revisited
+    if (upsertHandle.getPartitionPath() == null) {
+      LOG.info("Upsert Handle has partition path as null " + 
upsertHandle.getOldFilePath() + ", "
+          + upsertHandle.writeStatuses());
+    }
+
+    return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
+  }
+
+  protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
+                                              Map<String, HoodieRecord<T>> 
keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
+    if (requireSortedRecords()) {
+      return new HoodieSortedMergeHandle<>(config, instantTime, this, 
keyToNewRecords, partitionPath, fileId,
+          dataFileToBeMerged, taskContextSupplier, Option.empty());
+    } else {
+      return new HoodieMergeHandle<>(config, instantTime, this, 
keyToNewRecords, partitionPath, fileId,
+          dataFileToBeMerged, taskContextSupplier, Option.empty());
+    }
+  }
+
+  @Override
+  public Iterator<List<WriteStatus>> handleInsert(
+      String instantTime, String partitionPath, String fileId,
+      Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
+    HoodieCreateHandle<?, ?, ?, ?> createHandle =
+        new HoodieCreateHandle(config, instantTime, this, partitionPath, 
fileId, recordMap, taskContextSupplier);
+    createHandle.write();
+    return Collections.singletonList(createHandle.close()).iterator();
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index b219ba1..136c25b 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -29,9 +30,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
+import 
org.apache.hudi.table.action.compact.HoodieJavaMergeOnReadTableCompactor;
+import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
+import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
 import 
org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor;
 
 import java.util.List;
+import java.util.Map;
 
 public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends 
HoodieJavaCopyOnWriteTable<T> {
   protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, 
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
@@ -60,4 +65,21 @@ public class HoodieJavaMergeOnReadTable<T extends 
HoodieRecordPayload> extends H
     return new 
JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, 
config,
         this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
   }
+
+  @Override
+  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
+    ScheduleCompactionActionExecutor scheduleCompactionExecutor = new 
ScheduleCompactionActionExecutor(
+        context, config, this, instantTime, extraMetadata,
+        new HoodieJavaMergeOnReadTableCompactor());
+    return scheduleCompactionExecutor.execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> compact(
+      HoodieEngineContext context, String compactionInstantTime) {
+    RunCompactionActionExecutor compactionExecutor = new 
RunCompactionActionExecutor(
+        context, config, this, compactionInstantTime, new 
HoodieJavaMergeOnReadTableCompactor(),
+        new HoodieJavaCopyOnWriteTable(config, context, getMetaClient()));
+    return convertMetadata(compactionExecutor.execute());
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 8b0a7a9..f9c7caf 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -31,9 +32,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.JavaHoodieIndexFactory;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import java.util.List;
 
+import static org.apache.hudi.common.data.HoodieList.getList;
+
 public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
     extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
   protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext 
context, HoodieTableMetaClient metaClient) {
@@ -61,6 +65,11 @@ public abstract class HoodieJavaTable<T extends 
HoodieRecordPayload>
     }
   }
 
+  public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
+      HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
+    return metadata.clone(getList(metadata.getWriteStatuses()));
+  }
+
   @Override
   protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return JavaHoodieIndexFactory.createIndex(config);
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java
new file mode 100644
index 0000000..1d78ecc
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+
+public class JavaClusteringPlanActionExecutor<T extends HoodieRecordPayload> 
extends
+    BaseClusteringPlanActionExecutor<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+
+  public JavaClusteringPlanActionExecutor(
+      HoodieEngineContext context, HoodieWriteConfig config,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
+      String instantTime, Option<Map<String, String>> extraMetadata) {
+    super(context, config, table, instantTime, extraMetadata);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java
new file mode 100644
index 0000000..83364bd
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class JavaExecuteClusteringCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends BaseJavaCommitActionExecutor<T> {
+
+  private final HoodieClusteringPlan clusteringPlan;
+
+  public JavaExecuteClusteringCommitActionExecutor(
+      HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
+      String instantTime) {
+    super(context, config, table, instantTime, WriteOperationType.CLUSTER);
+    this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+        .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+            "Unable to read clustering plan for instant: " + instantTime));
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
+    // Mark instant as clustering inflight
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
+    table.getMetaClient().reloadActiveTimeline();
+
+    final Schema schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
+    HoodieWriteMetadata<List<WriteStatus>> writeMetadata = (
+        (ClusteringExecutionStrategy<T, List<HoodieRecord<? extends 
HoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>>)
+            
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
+                new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config))
+        .performClustering(clusteringPlan, schema, instantTime);
+    List<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
+    List<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
+    
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+    
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
+    validateWriteResult(writeMetadata);
+    commitOnAutoCommit(writeMetadata);
+    if (!writeMetadata.getCommitMetadata().isPresent()) {
+      HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), 
writeMetadata.getPartitionToReplaceFileIds(),
+          extraMetadata, operationType, getSchemaToStoreInCommit(), 
getCommitActionType());
+      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
+    }
+    return writeMetadata;
+  }
+
+  /**
+   * Validate actions taken by clustering. In the first implementation, we 
validate at least one new file is written.
+   * But we can extend this to add more validation. E.g. number of records 
read = number of records written etc.
+   * We can also make these validations in BaseCommitActionExecutor to reuse 
pre-commit hooks for multiple actions.
+   */
+  private void validateWriteResult(HoodieWriteMetadata<List<WriteStatus>> 
writeMetadata) {
+    if (writeMetadata.getWriteStatuses().isEmpty()) {
+      throw new HoodieClusteringException("Clustering plan produced 0 
WriteStatus for " + instantTime
+          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected 
at least "
+          + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+          + " write statuses");
+    }
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> 
writeMetadata) {
+    Set<HoodieFileGroupId> newFilesWritten = 
writeMetadata.getWriteStats().get().stream()
+        .map(s -> new HoodieFileGroupId(s.getPartitionPath(), 
s.getFileId())).collect(Collectors.toSet());
+    return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
+        .filter(fg -> !newFilesWritten.contains(fg))
+        .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), 
Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 66cb407..2a93c50 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -126,13 +126,14 @@ public abstract class BaseJavaCommitActionExecutor<T 
extends HoodieRecordPayload
     return result;
   }
 
-  protected void updateIndex(List<WriteStatus> writeStatuses, 
HoodieWriteMetadata<List<WriteStatus>> result) {
+  protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses, 
HoodieWriteMetadata<List<WriteStatus>> result) {
     Instant indexStartTime = Instant.now();
     // Update the index back
     List<WriteStatus> statuses = HoodieList.getList(
         table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, 
table));
     result.setIndexUpdateDuration(Duration.between(indexStartTime, 
Instant.now()));
     result.setWriteStatuses(statuses);
+    return statuses;
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
new file mode 100644
index 0000000..30bdcda
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+/**
+ * Compacts a hoodie table with merge on read storage in Java engine. Computes 
all possible
+ * compactions, passes it through a CompactionFilter and executes all the 
compactions and
+ * writes a new version of base files and make a normal commit.
+ */
+public class HoodieJavaMergeOnReadTableCompactor<T extends HoodieRecordPayload>
+    extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
+
+  @Override
+  public void preCompact(
+      HoodieTable table, HoodieTimeline pendingCompactionTimeline, String 
compactionInstantTime) {
+    HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
+    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+      table.rollbackInflightCompaction(inflightInstant);
+      table.getMetaClient().reloadActiveTimeline();
+    }
+  }
+
+  @Override
+  public void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieWriteConfig config) {
+    // No OP
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
new file mode 100644
index 0000000..5d6f211
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.testutils.HoodieJavaClientTestBase;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestJavaBulkInsertInternalPartitioner extends 
HoodieJavaClientTestBase {
+  private static final Comparator<HoodieRecord> KEY_COMPARATOR =
+      Comparator.comparing(o -> (o.getPartitionPath() + "+" + 
o.getRecordKey()));
+
+  public static List<HoodieRecord> generateTestRecordsForBulkInsert(int 
numRecords) {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    List<HoodieRecord> records = dataGenerator.generateInserts("0", 
numRecords);
+    return records;
+  }
+
+  public static Map<String, Long> 
generatePartitionNumRecords(List<HoodieRecord> records) {
+    return records.stream().map(record -> record.getPartitionPath())
+        .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"rider", "rider,driver"})
+  public void testCustomColumnSortPartitioner(String sortColumnString) throws 
Exception {
+    String[] sortColumns = sortColumnString.split(",");
+    Comparator<HoodieRecord> columnComparator =
+        getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, 
sortColumns);
+
+    List<HoodieRecord> records = generateTestRecordsForBulkInsert(1000);
+    testBulkInsertInternalPartitioner(
+        new JavaCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA),
+        records, true, generatePartitionNumRecords(records), 
Option.of(columnComparator));
+  }
+
+  private Comparator<HoodieRecord> getCustomColumnComparator(Schema schema, 
String[] sortColumns) {
+    return Comparator.comparing(
+        record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, 
schema).toString());
+  }
+
+  private void verifyRecordAscendingOrder(List<HoodieRecord> records,
+                                          Option<Comparator<HoodieRecord>> 
comparator) {
+    List<HoodieRecord> expectedRecords = new ArrayList<>(records);
+    Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR));
+    assertEquals(expectedRecords, records);
+  }
+
+  private void testBulkInsertInternalPartitioner(BulkInsertPartitioner 
partitioner,
+                                                 List<HoodieRecord> records,
+                                                 boolean isSorted,
+                                                 Map<String, Long> 
expectedPartitionNumRecords,
+                                                 
Option<Comparator<HoodieRecord>> comparator) {
+    List<HoodieRecord> actualRecords =
+        (List<HoodieRecord>) partitioner.repartitionRecords(records, 1);
+    if (isSorted) {
+      // Verify global order
+      verifyRecordAscendingOrder(actualRecords, comparator);
+    }
+
+    // Verify number of records per partition path
+    assertEquals(expectedPartitionNumRecords, 
generatePartitionNumRecords(actualRecords));
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
index 5c13277..ad19824 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
 import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -49,6 +50,7 @@ public class SparkRecentDaysClusteringPlanStrategy<T extends 
HoodieRecordPayload
     super(table, engineContext, writeConfig);
   }
 
+  @Override
   protected List<String> filterPartitionPaths(List<String> partitionPaths) {
     int targetPartitionsForClustering = 
getWriteConfig().getTargetPartitionsForClustering();
     int skipPartitionsFromLatestForClustering = 
getWriteConfig().getSkipPartitionsFromLatestForClustering();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index fb3c5ec..a36da5f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -18,18 +18,15 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.BulkInsertPartitioner;
-import org.apache.spark.api.java.JavaRDD;
 
-import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaRDD;
 
 /**
  * A partitioner that does sorting based on specified column values for each 
RDD partition.
@@ -57,7 +54,8 @@ public class RDDCustomColumnsSortPartitioner<T extends 
HoodieRecordPayload>
                                                      int 
outputSparkPartitions) {
     final String[] sortColumns = this.sortColumnNames;
     final SerializableSchema schema = this.serializableSchema;
-    return records.sortBy(record -> getRecordSortColumnValues(record, 
sortColumns, schema), 
+    return records.sortBy(
+        record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, 
schema),
         true, outputSparkPartitions);
   }
 
@@ -66,26 +64,6 @@ public class RDDCustomColumnsSortPartitioner<T extends 
HoodieRecordPayload>
     return true;
   }
 
-  private static Object getRecordSortColumnValues(HoodieRecord<? extends 
HoodieRecordPayload> record,
-                                                  String[] sortColumns,
-                                                  SerializableSchema schema) {
-    try {
-      GenericRecord genericRecord = (GenericRecord) 
record.getData().getInsertValue(schema.get()).get();
-      if (sortColumns.length == 1) {
-        return HoodieAvroUtils.getNestedFieldVal(genericRecord, 
sortColumns[0], true);
-      } else {
-        StringBuilder sb = new StringBuilder();
-        for (String col : sortColumns) {
-          sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, 
col, true));
-        }
-
-        return sb.toString();
-      }
-    } catch (IOException e) {
-      throw new HoodieIOException("Unable to read record with key:" + 
record.getKey(), e);
-    }
-  }
-
   private String[] getSortColumnName(HoodieWriteConfig config) {
     return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
index 683d852..81a0a74 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
@@ -18,20 +18,14 @@
 
 package org.apache.hudi.table.action.cluster;
 
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.Map;
@@ -40,8 +34,6 @@ import java.util.Map;
 public class SparkClusteringPlanActionExecutor<T extends HoodieRecordPayload> 
extends
     BaseClusteringPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
 
-  private static final Logger LOG = 
LogManager.getLogger(SparkClusteringPlanActionExecutor.class);
-
   public SparkClusteringPlanActionExecutor(HoodieEngineContext context,
                                            HoodieWriteConfig config,
                                            HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
@@ -49,33 +41,4 @@ public class SparkClusteringPlanActionExecutor<T extends 
HoodieRecordPayload> ex
                                            Option<Map<String, String>> 
extraMetadata) {
     super(context, config, table, instantTime, extraMetadata);
   }
-
-  @Override
-  protected Option<HoodieClusteringPlan> createClusteringPlan() {
-    LOG.info("Checking if clustering needs to be run on " + 
config.getBasePath());
-    Option<HoodieInstant> lastClusteringInstant = 
table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();
-
-    int commitsSinceLastClustering = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
-        
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
 Integer.MAX_VALUE)
-        .countInstants();
-    if (config.inlineClusteringEnabled() && 
config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
-      LOG.info("Not scheduling inline clustering as only " + 
commitsSinceLastClustering
-          + " commits was found since last clustering " + 
lastClusteringInstant + ". Waiting for "
-          + config.getInlineClusterMaxCommits());
-      return Option.empty();
-    }
-
-    if (config.isAsyncClusteringEnabled() && 
config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
-      LOG.info("Not scheduling async clustering as only " + 
commitsSinceLastClustering
-          + " commits was found since last clustering " + 
lastClusteringInstant + ". Waiting for "
-          + config.getAsyncClusterMaxCommits());
-      return Option.empty();
-    }
-
-    LOG.info("Generating clustering plan for table " + config.getBasePath());
-    ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
-        ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), 
table, context, config);
-    return strategy.generateClusteringPlan();
-  }
-
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a602b45..ff8aefe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,8 +18,10 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -229,10 +231,10 @@ public class HoodieAvroUtils {
 
   public static Schema removeFields(Schema schema, List<String> 
fieldsToRemove) {
     List<Schema.Field> filteredFields = schema.getFields()
-                                              .stream()
-                                              .filter(field -> 
!fieldsToRemove.contains(field.name()))
-                                              .map(field -> new 
Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
-                                              .collect(Collectors.toList());
+        .stream()
+        .filter(field -> !fieldsToRemove.contains(field.name()))
+        .map(field -> new Schema.Field(field.name(), field.schema(), 
field.doc(), field.defaultVal()))
+        .collect(Collectors.toList());
     Schema filteredSchema = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(), false);
     filteredSchema.setFields(filteredFields);
     return filteredSchema;
@@ -289,7 +291,7 @@ public class HoodieAvroUtils {
   }
 
   public static GenericRecord addHoodieKeyToRecord(GenericRecord record, 
String recordKey, String partitionPath,
-      String fileName) {
+                                                   String fileName) {
     record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName);
     record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
     record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
@@ -551,7 +553,7 @@ public class HoodieAvroUtils {
       } else if (fieldSchema.getType() == Schema.Type.BYTES) {
         ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
         BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer, 
fieldSchema,
-                LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
+            LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
         byteBuffer.rewind();
         return convertedValue;
       }
@@ -570,9 +572,51 @@ public class HoodieAvroUtils {
    * @return sanitized name
    */
   public static String sanitizeName(String name) {
-    if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
+    if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
       name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, 
MASK_FOR_INVALID_CHARS_IN_NAMES);
     }
     return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, 
MASK_FOR_INVALID_CHARS_IN_NAMES);
   }
+
+  /**
+   * Gets record column values into one object.
+   *
+   * @param record  Hoodie record.
+   * @param columns Names of the columns to get values.
+   * @param schema  {@link Schema} instance.
+   * @return Column value if a single column, or concatenated String values by 
comma.
+   */
+  public static Object getRecordColumnValues(HoodieRecord<? extends 
HoodieRecordPayload> record,
+                                             String[] columns,
+                                             Schema schema) {
+    try {
+      GenericRecord genericRecord = (GenericRecord) 
record.getData().getInsertValue(schema).get();
+      if (columns.length == 1) {
+        return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], 
true);
+      } else {
+        StringBuilder sb = new StringBuilder();
+        for (String col : columns) {
+          sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, 
col, true));
+        }
+
+        return sb.toString();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Unable to read record with key:" + 
record.getKey(), e);
+    }
+  }
+
+  /**
+   * Gets record column values into one object.
+   *
+   * @param record  Hoodie record.
+   * @param columns Names of the columns to get values.
+   * @param schema  {@link SerializableSchema} instance.
+   * @return Column value if a single column, or concatenated String values by 
comma.
+   */
+  public static Object getRecordColumnValues(HoodieRecord<? extends 
HoodieRecordPayload> record,
+                                             String[] columns,
+                                             SerializableSchema schema) {
+    return getRecordColumnValues(record, columns, schema.get());
+  }
 }
diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
index f5087ce..1754eb8 100644
--- a/hudi-kafka-connect/README.md
+++ b/hudi-kafka-connect/README.md
@@ -106,7 +106,7 @@ to generate, with each batch containing a number of 
messages and idle time betwe
 bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
 ```
 
-### 4 - Run the Sink connector worker (multiple workers can be run)
+### 5 - Run the Sink connector worker (multiple workers can be run)
 
 The Kafka connect is a distributed platform, with the ability to run one or 
more workers (each running multiple tasks) 
 that parallely process the records from the Kafka partitions for the same 
topic. We provide a properties file with 
@@ -120,7 +120,7 @@ cd $KAFKA_HOME
 ./bin/connect-distributed.sh 
$HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
 ```
 
-### 5- To add the Hudi Sink to the Connector (delete it if you want to 
re-configure)
+### 6 - To add the Hudi Sink to the Connector (delete it if you want to 
re-configure)
 
 Once the Connector has started, it will not run the Sink, until the Hudi sink 
is added using the web api. The following 
 curl APIs can be used to delete and add a new Hudi Sink. Again, a default 
configuration is provided for the Hudi Sink, 
@@ -170,4 +170,132 @@ total 5168
 -rw-r--r--  1 user  wheel  440214 Sep 13 21:43 
E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
 ```
 
+### 7 - Run async compaction and clustering if scheduled
 
+When using Merge-On-Read (MOR) as the table type, async compaction and 
clustering can be scheduled when the Sink is
+running. Inline compaction and clustering are disabled by default due to 
performance reason. By default, async
+compaction scheduling is enabled, and you can disable it by setting 
`hoodie.kafka.compaction.async.enable` to `false`.
+Async clustering scheduling is disabled by default, and you can enable it by 
setting `hoodie.clustering.async.enabled`
+to `true`.
+
+The Sink only schedules the compaction and clustering if necessary and does 
not execute them for performance. You need
+to execute the scheduled compaction and clustering using separate Spark jobs 
or Hudi CLI.
+
+After the compaction is scheduled, you can see the requested compaction 
instant (`20211111111410.compaction.requested`)
+below:
+
+```
+ls -l /tmp/hoodie/hudi-test-topic/.hoodie
+total 280
+-rw-r--r--  1 user  wheel  21172 Nov 11 11:09 20211111110807.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:08 
20211111110807.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:08 
20211111110807.deltacommit.requested
+-rw-r--r--  1 user  wheel  22458 Nov 11 11:11 20211111110940.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:09 
20211111110940.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:09 
20211111110940.deltacommit.requested
+-rw-r--r--  1 user  wheel  21445 Nov 11 11:13 20211111111110.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:11 
20211111111110.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:11 
20211111111110.deltacommit.requested
+-rw-r--r--  1 user  wheel  24943 Nov 11 11:14 20211111111303.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:13 
20211111111303.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:13 
20211111111303.deltacommit.requested
+-rw-r--r--  1 user  wheel   9885 Nov 11 11:14 
20211111111410.compaction.requested
+-rw-r--r--  1 user  wheel  21192 Nov 11 11:15 20211111111411.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:14 
20211111111411.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:14 
20211111111411.deltacommit.requested
+-rw-r--r--  1 user  wheel      0 Nov 11 11:15 
20211111111530.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:15 
20211111111530.deltacommit.requested
+drwxr-xr-x  2 user  wheel     64 Nov 11 11:08 archived
+-rw-r--r--  1 user  wheel    387 Nov 11 11:08 hoodie.properties
+```
+
+Then you can run async compaction job with `HoodieCompactor` and 
`spark-submit` by:
+
+```
+spark-submit \
+  --class org.apache.hudi.utilities.HoodieCompactor \
+  
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar
 \
+  --base-path /tmp/hoodie/hudi-test-topic \
+  --table-name hudi-test-topic \
+  --schema-file /Users/user/repo/hudi/docker/demo/config/schema.avsc \
+  --instant-time 20211111111410 \
+  --parallelism 2 \
+  --spark-memory 1g
+```
+
+Note that you don't have to provide the instant time through `--instant-time`. 
In that case, the earliest scheduled
+compaction is going to be executed.
+
+Alternatively, you can use Hudi CLI to execute compaction:
+
+```
+hudi-> connect --path /tmp/hoodie/hudi-test-topic
+hudi:hudi-test-topic-> compactions show all
+╔═════════════════════════╤═══════════╤═══════════════════════════════╗
+║ Compaction Instant Time │ State     │ Total FileIds to be Compacted ║
+╠═════════════════════════╪═══════════╪═══════════════════════════════╣
+║ 20211111111410          │ REQUESTED │ 12                            ║
+╚═════════════════════════╧═══════════╧═══════════════════════════════╝
+
+compaction validate --instant 20211111111410
+compaction run --compactionInstant 20211111111410 --parallelism 2 
--schemaFilePath /Users/user/repo/hudi/docker/demo/config/schema.avsc
+```
+
+Similarly, you can see the requested clustering instant 
(`20211111111813.replacecommit.requested`) after it is scheduled
+by the Sink:
+
+```
+ls -l /tmp/hoodie/hudi-test-topic/.hoodie
+total 736
+-rw-r--r--  1 user  wheel  24943 Nov 11 11:14 20211111111303.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:13 
20211111111303.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:13 
20211111111303.deltacommit.requested
+-rw-r--r--  1 user  wheel  18681 Nov 11 11:17 20211111111410.commit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:17 
20211111111410.compaction.inflight
+-rw-r--r--  1 user  wheel   9885 Nov 11 11:14 
20211111111410.compaction.requested
+-rw-r--r--  1 user  wheel  21192 Nov 11 11:15 20211111111411.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:14 
20211111111411.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:14 
20211111111411.deltacommit.requested
+-rw-r--r--  1 user  wheel  22460 Nov 11 11:17 20211111111530.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:15 
20211111111530.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:15 
20211111111530.deltacommit.requested
+-rw-r--r--  1 user  wheel  21357 Nov 11 11:18 20211111111711.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:17 
20211111111711.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:17 
20211111111711.deltacommit.requested
+-rw-r--r--  1 user  wheel   6516 Nov 11 11:18 
20211111111813.replacecommit.requested
+-rw-r--r--  1 user  wheel  26070 Nov 11 11:20 20211111111815.deltacommit
+-rw-r--r--  1 user  wheel      0 Nov 11 11:18 
20211111111815.deltacommit.inflight
+-rw-r--r--  1 user  wheel      0 Nov 11 11:18 
20211111111815.deltacommit.requested
+```
+
+Then you can run async clustering job with `HoodieClusteringJob` and 
`spark-submit` by:
+
+```
+spark-submit \
+  --class org.apache.hudi.utilities.HoodieClusteringJob \
+  
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar
 \
+  --props clusteringjob.properties \
+  --mode execute \
+  --base-path /tmp/hoodie/hudi-test-topic \
+  --table-name sample_table \
+  --instant-time 20211111111813 \
+  --spark-memory 1g
+```
+
+Sample `clusteringjob.properties`:
+
+```
+hoodie.datasource.write.recordkey.field=volume
+hoodie.datasource.write.partitionpath.field=date
+hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/hudi-test-topic/versions/latest
+
+hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
+hoodie.clustering.plan.strategy.small.file.limit=629145600
+hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
+hoodie.clustering.plan.strategy.sort.columns=volume
+
+hoodie.write.concurrency.mode=single_writer
+```
+
+Note that you don't have to provide the instant time through `--instant-time`. 
In that case, the earliest scheduled
+clustering is going to be executed.
diff --git a/hudi-kafka-connect/demo/config-sink.json 
b/hudi-kafka-connect/demo/config-sink.json
index 2d2be00..12a2f50 100644
--- a/hudi-kafka-connect/demo/config-sink.json
+++ b/hudi-kafka-connect/demo/config-sink.json
@@ -10,6 +10,7 @@
                "topics": "hudi-test-topic",
                "hoodie.table.name": "hudi-test-topic",
                "hoodie.table.type": "MERGE_ON_READ",
+               "hoodie.metadata.enable": "false",
                "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
                "hoodie.datasource.write.recordkey.field": "volume",
                "hoodie.datasource.write.partitionpath.field": "date",
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index 773ce1e..714d4d2 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -73,6 +73,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
           + "the coordinator will wait for the write statuses from all the 
partitions"
           + "to ignore the current commit and start a new commit.");
 
+  public static final ConfigProperty<String> ASYNC_COMPACT_ENABLE = 
ConfigProperty
+      .key("hoodie.kafka.compaction.async.enable")
+      .defaultValue("true")
+      .withDocumentation("Controls whether async compaction should be turned 
on for MOR table writing.");
+
   public static final ConfigProperty<String> META_SYNC_ENABLE = ConfigProperty
       .key("hoodie.meta.sync.enable")
       .defaultValue("false")
@@ -121,6 +126,10 @@ public class KafkaConnectConfigs extends HoodieConfig {
     return getString(KAFKA_VALUE_CONVERTER);
   }
 
+  public Boolean isAsyncCompactEnabled() {
+    return getBoolean(ASYNC_COMPACT_ENABLE);
+  }
+
   public Boolean isMetaSyncEnabled() {
     return getBoolean(META_SYNC_ENABLE);
   }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index 8039e56..7ec3034 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -54,6 +55,8 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
 
   private final Option<HoodieTableMetaClient> tableMetaClient;
   private final Configuration hadoopConf;
+  private final HoodieWriteConfig writeConfig;
+  private final KafkaConnectConfigs connectConfigs;
   private final String tableBasePath;
   private final String tableName;
   private final HoodieEngineContext context;
@@ -61,8 +64,11 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
   private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
 
   public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) 
throws HoodieException {
-    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
-        .withProperties(connectConfigs.getProps()).build();
+    this.connectConfigs = connectConfigs;
+    this.writeConfig = HoodieWriteConfig.newBuilder()
+        .withEngineType(EngineType.JAVA)
+        .withProperties(connectConfigs.getProps())
+        .build();
 
     tableBasePath = writeConfig.getBasePath();
     tableName = writeConfig.getTableName();
@@ -95,6 +101,7 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
     }
   }
 
+  @Override
   public String startCommit() {
     String newCommitTime = javaClient.startCommit();
     javaClient.transitionInflight(newCommitTime);
@@ -102,11 +109,23 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
     return newCommitTime;
   }
 
+  @Override
   public void endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
     javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
     LOG.info("Ending Hudi commit " + commitTime);
+
+    // Schedule clustering and compaction as needed.
+    if (writeConfig.isAsyncClusteringEnabled()) {
+      javaClient.scheduleClustering(Option.empty()).ifPresent(
+          instantTs -> LOG.info("Scheduled clustering at instant time:" + 
instantTs));
+    }
+    if (isAsyncCompactionEnabled()) {
+      javaClient.scheduleCompaction(Option.empty()).ifPresent(
+          instantTs -> LOG.info("Scheduled compaction at instant time:" + 
instantTs));
+    }
   }
 
+  @Override
   public Map<String, String> fetchLatestExtraCommitMetadata() {
     if (tableMetaClient.isPresent()) {
       Option<HoodieCommitMetadata> metadata = 
KafkaConnectUtils.getCommitMetadataForLatestInstant(tableMetaClient.get());
@@ -119,4 +138,10 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
     }
     throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata 
since Table Meta Client is absent");
   }
+
+  private boolean isAsyncCompactionEnabled() {
+    return tableMetaClient.isPresent()
+        && 
HoodieTableType.MERGE_ON_READ.equals(tableMetaClient.get().getTableType())
+        && connectConfigs.isAsyncCompactEnabled();
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 75f55bb..ce69eff 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -164,7 +164,7 @@ public class HoodieCompactor {
     // Get schema.
     SparkRDDWriteClient client =
         UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, 
Option.of(cfg.strategyClassName), props);
-    if (cfg.compactionInstantTime == null) {
+    if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
       throw new IllegalArgumentException("No instant time is provided for 
scheduling compaction. "
           + "Please specify the compaction instant time by using 
--instant-time.");
     }

Reply via email to