This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5804ad8  [HUDI-1483] Support async clustering for deltastreamer and 
Spark streaming (#3142)
5804ad8 is described below

commit 5804ad8e32ae05758ebc5e47f5d4fb4db371ab52
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Jul 12 00:13:38 2021 +0530

    [HUDI-1483] Support async clustering for deltastreamer and Spark streaming 
(#3142)
    
    - Integrate async clustering service with HoodieDeltaStreamer and 
HoodieStreamingSink
    - Added methods in HoodieAsyncService to reuse code
---
 .../apache/hudi/async/AsyncClusteringService.java  | 100 +++++++++++++++++++++
 .../org/apache/hudi/async/AsyncCompactService.java |  56 +-----------
 .../org/apache/hudi/async/HoodieAsyncService.java  |  60 ++++++++++++-
 .../hudi/client/AbstractClusteringClient.java      |  55 ++++++++++++
 .../apache/hudi/config/HoodieClusteringConfig.java |  13 ++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../hudi/async/SparkAsyncClusteringService.java    |  39 ++++++++
 .../hudi/client/HoodieSparkClusteringClient.java   |  56 ++++++++++++
 .../cluster/SparkClusteringPlanActionExecutor.java |  11 ++-
 .../apache/hudi/common/util/ClusteringUtils.java   |   4 +
 .../main/java/org/apache/hudi/DataSourceUtils.java |   6 ++
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  12 +++
 .../SparkStreamingAsyncClusteringService.java      |  42 +++++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  48 +++++++---
 .../org/apache/hudi/HoodieStreamingSink.scala      |  55 ++++++++++--
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   2 +
 .../hudi-spark/src/test/java/HoodieJavaApp.java    |   3 +
 .../src/test/java/HoodieJavaStreamingApp.java      |   1 +
 .../hudi/functional/TestStructuredStreaming.scala  |  48 ++++++++--
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  20 +++++
 .../deltastreamer/HoodieDeltaStreamer.java         |  83 ++++++++++++++---
 .../HoodieMultiTableDeltaStreamer.java             |   6 ++
 .../functional/TestHoodieDeltaStreamer.java        |  98 +++++++++++++++-----
 23 files changed, 710 insertions(+), 112 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
new file mode 100644
index 0000000..b9707bb
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+
+/**
+ * Async clustering service that runs in a separate thread.
+ * Currently, only one clustering thread is allowed to run at any time.
+ */
+public abstract class AsyncClusteringService extends HoodieAsyncService {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(AsyncClusteringService.class);
+
+  private final int maxConcurrentClustering;
+  private transient AbstractClusteringClient clusteringClient;
+
+  public AsyncClusteringService(AbstractHoodieWriteClient writeClient) {
+    this(writeClient, false);
+  }
+
+  public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean 
runInDaemonMode) {
+    super(runInDaemonMode);
+    this.clusteringClient = createClusteringClient(writeClient);
+    this.maxConcurrentClustering = 1;
+  }
+
+  protected abstract AbstractClusteringClient 
createClusteringClient(AbstractHoodieWriteClient client);
+
+  /**
+   * Start clustering service.
+   */
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {
+    ExecutorService executor = 
Executors.newFixedThreadPool(maxConcurrentClustering,
+        r -> {
+          Thread t = new Thread(r, "async_clustering_thread");
+          t.setDaemon(isRunInDaemonMode());
+          return t;
+        });
+
+    return Pair.of(CompletableFuture.allOf(IntStream.range(0, 
maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
+      try {
+        while (!isShutdownRequested()) {
+          final HoodieInstant instant = fetchNextAsyncServiceInstant();
+          if (null != instant) {
+            LOG.info("Starting clustering for instant " + instant);
+            clusteringClient.cluster(instant);
+            LOG.info("Finished clustering for instant " + instant);
+          }
+        }
+        LOG.info("Clustering executor shutting down properly");
+      } catch (InterruptedException ie) {
+        LOG.warn("Clustering executor got interrupted exception! Stopping", 
ie);
+      } catch (IOException e) {
+        LOG.error("Clustering executor failed", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      return true;
+    }, executor)).toArray(CompletableFuture[]::new)), executor);
+  }
+
+  /**
+   * Update the write client to be used for clustering.
+   */
+  public synchronized void updateWriteClient(AbstractHoodieWriteClient 
writeClient) {
+    this.clusteringClient.updateWriteClient(writeClient);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
index 1bb5daa..2f63297 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -24,18 +24,14 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.IntStream;
 
 /**
@@ -54,9 +50,6 @@ public abstract class AsyncCompactService extends 
HoodieAsyncService {
   private final int maxConcurrentCompaction;
   private transient AbstractCompactor compactor;
   protected transient HoodieEngineContext context;
-  private transient BlockingQueue<HoodieInstant> pendingCompactions = new 
LinkedBlockingQueue<>();
-  private transient ReentrantLock queueLock = new ReentrantLock();
-  private transient Condition consumed = queueLock.newCondition();
 
   public AsyncCompactService(HoodieEngineContext context, 
AbstractHoodieWriteClient client) {
     this(context, client, false);
@@ -72,51 +65,6 @@ public abstract class AsyncCompactService extends 
HoodieAsyncService {
   protected abstract AbstractCompactor 
createCompactor(AbstractHoodieWriteClient client);
 
   /**
-   * Enqueues new Pending compaction.
-   */
-  public void enqueuePendingCompaction(HoodieInstant instant) {
-    pendingCompactions.add(instant);
-  }
-
-  /**
-   * Wait till outstanding pending compactions reduces to the passed in value.
-   *
-   * @param numPendingCompactions Maximum pending compactions allowed
-   * @throws InterruptedException
-   */
-  public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) 
throws InterruptedException {
-    try {
-      queueLock.lock();
-      while (!isShutdown() && (pendingCompactions.size() > 
numPendingCompactions)) {
-        consumed.await();
-      }
-    } finally {
-      queueLock.unlock();
-    }
-  }
-
-  /**
-   * Fetch Next pending compaction if available.
-   *
-   * @return
-   * @throws InterruptedException
-   */
-  private HoodieInstant fetchNextCompactionInstant() throws 
InterruptedException {
-    LOG.info("Compactor waiting for next instant for compaction upto 60 
seconds");
-    HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS);
-    if (instant != null) {
-      try {
-        queueLock.lock();
-        // Signal waiting thread
-        consumed.signal();
-      } finally {
-        queueLock.unlock();
-      }
-    }
-    return instant;
-  }
-
-  /**
    * Start Compaction Service.
    */
   @Override
@@ -134,7 +82,7 @@ public abstract class AsyncCompactService extends 
HoodieAsyncService {
         context.setProperty(EngineProperty.COMPACTION_POOL_NAME, 
COMPACT_POOL_NAME);
 
         while (!isShutdownRequested()) {
-          final HoodieInstant instant = fetchNextCompactionInstant();
+          final HoodieInstant instant = fetchNextAsyncServiceInstant();
 
           if (null != instant) {
             LOG.info("Starting Compaction for instant " + instant);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index 32dd042..85e0081 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -18,21 +18,26 @@
 
 package org.apache.hudi.async;
 
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 
 /**
- * Base Class for running clean/delta-sync/compaction in separate thread and 
controlling their life-cycle.
+ * Base Class for running clean/delta-sync/compaction/clustering in separate 
thread and controlling their life-cycle.
  */
 public abstract class HoodieAsyncService implements Serializable {
 
@@ -50,6 +55,12 @@ public abstract class HoodieAsyncService implements 
Serializable {
   private transient CompletableFuture future;
   // Run in daemon mode
   private final boolean runInDaemonMode;
+  // Queue to hold pending compaction/clustering instants
+  private transient BlockingQueue<HoodieInstant> pendingInstants = new 
LinkedBlockingQueue<>();
+  // Mutex lock for synchronized access to pendingInstants queue
+  private transient ReentrantLock queueLock = new ReentrantLock();
+  // Condition instance to use with the queueLock
+  private transient Condition consumed = queueLock.newCondition();
 
   protected HoodieAsyncService() {
     this(false);
@@ -165,4 +176,51 @@ public abstract class HoodieAsyncService implements 
Serializable {
   public boolean isRunInDaemonMode() {
     return runInDaemonMode;
   }
+
+  /**
+   * Wait till outstanding pending compaction/clustering reduces to the passed 
in value.
+   *
+   * @param numPending Maximum pending compactions/clustering allowed
+   * @throws InterruptedException
+   */
+  public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) 
throws InterruptedException {
+    try {
+      queueLock.lock();
+      while (!isShutdown() && (pendingInstants.size() > numPending)) {
+        consumed.await();
+      }
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  /**
+   * Enqueues new pending clustering instant.
+   * @param instant {@link HoodieInstant} to enqueue.
+   */
+  public void enqueuePendingAsyncServiceInstant(HoodieInstant instant) {
+    LOG.info("Enqueuing new pending clustering instant: " + 
instant.getTimestamp());
+    pendingInstants.add(instant);
+  }
+
+  /**
+   * Fetch next pending compaction/clustering instant if available.
+   *
+   * @return {@link HoodieInstant} corresponding to the next pending 
compaction/clustering.
+   * @throws InterruptedException
+   */
+  HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException {
+    LOG.info("Waiting for next instant upto 10 seconds");
+    HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS);
+    if (instant != null) {
+      try {
+        queueLock.lock();
+        // Signal waiting thread
+        consumed.signal();
+      } finally {
+        queueLock.unlock();
+      }
+    }
+    return instant;
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
new file mode 100644
index 0000000..34234f5
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Client will run one round of clustering.
+ */
+public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, 
I, K, O> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  protected transient AbstractHoodieWriteClient<T, I, K, O> clusteringClient;
+
+  public AbstractClusteringClient(AbstractHoodieWriteClient<T, I, K, O> 
clusteringClient) {
+    this.clusteringClient = clusteringClient;
+  }
+
+  /**
+   * Run clustering for the instant.
+   * @param instant
+   * @throws IOException
+   */
+  public abstract void cluster(HoodieInstant instant) throws IOException;
+
+  /**
+   * Update the write client used by async clustering.
+   * @param writeClient
+   */
+  public void updateWriteClient(AbstractHoodieWriteClient<T, I, K, O> 
writeClient) {
+    this.clusteringClient = writeClient;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 86d26d3..a750fa2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -53,7 +53,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .key("hoodie.clustering.inline.max.commits")
       .defaultValue("4")
       .sinceVersion("0.7.0")
-      .withDocumentation("Config to control frequency of clustering");
+      .withDocumentation("Config to control frequency of inline clustering");
+
+  public static final ConfigProperty<String> ASYNC_CLUSTERING_MAX_COMMIT_PROP 
= ConfigProperty
+      .key("hoodie.clustering.async.max.commits")
+      .defaultValue("4")
+      .sinceVersion("0.9.0")
+      .withDocumentation("Config to control frequency of async clustering");
 
   // Any strategy specific params can be saved with this prefix
   public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = 
"hoodie.clustering.plan.strategy.";
@@ -177,6 +183,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAsyncClusteringMaxCommits(int numCommits) {
+      clusteringConfig.setValue(ASYNC_CLUSTERING_MAX_COMMIT_PROP, 
String.valueOf(numCommits));
+      return this;
+    }
+
     public Builder fromProperties(Properties props) {
       this.clusteringConfig.getProps().putAll(props);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1573c7f..20d2846 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -677,6 +677,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP);
   }
 
+  public int getAsyncClusterMaxCommits() {
+    return getInt(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP);
+  }
+
   public String getPayloadClass() {
     return getString(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java
new file mode 100644
index 0000000..ce436ba
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.HoodieSparkClusteringClient;
+
+/**
+ * Async clustering service for Spark datasource.
+ */
+public class SparkAsyncClusteringService extends AsyncClusteringService {
+
+  public SparkAsyncClusteringService(AbstractHoodieWriteClient writeClient) {
+    super(writeClient);
+  }
+
+  @Override
+  protected AbstractClusteringClient 
createClusteringClient(AbstractHoodieWriteClient client) {
+    return new HoodieSparkClusteringClient(client);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
new file mode 100644
index 0000000..884b555
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+
+/**
+ * Async clustering client for Spark datasource.
+ */
+public class HoodieSparkClusteringClient<T extends HoodieRecordPayload> extends
+    AbstractClusteringClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkClusteringClient.class);
+
+  public HoodieSparkClusteringClient(
+      AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> clusteringClient) {
+    super(clusteringClient);
+  }
+
+  @Override
+  public void cluster(HoodieInstant instant) throws IOException {
+    LOG.info("Executing clustering instance " + instant);
+    SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) 
clusteringClient;
+    JavaRDD<WriteStatus> res = writeClient.cluster(instant.getTimestamp(), 
true).getWriteStatuses();
+    if (res != null && 
res.collect().stream().anyMatch(WriteStatus::hasErrors)) {
+      // TODO: Should we treat this fatal and throw exception?
+      LOG.error("Clustering for instant (" + instant + ") failed with write 
errors");
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
index 1f71aa4..683d852 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
@@ -58,13 +58,20 @@ public class SparkClusteringPlanActionExecutor<T extends 
HoodieRecordPayload> ex
     int commitsSinceLastClustering = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
         
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
 Integer.MAX_VALUE)
         .countInstants();
-    if (config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
-      LOG.info("Not scheduling clustering as only " + 
commitsSinceLastClustering
+    if (config.inlineClusteringEnabled() && 
config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
+      LOG.info("Not scheduling inline clustering as only " + 
commitsSinceLastClustering
           + " commits was found since last clustering " + 
lastClusteringInstant + ". Waiting for "
           + config.getInlineClusterMaxCommits());
       return Option.empty();
     }
 
+    if (config.isAsyncClusteringEnabled() && 
config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
+      LOG.info("Not scheduling async clustering as only " + 
commitsSinceLastClustering
+          + " commits was found since last clustering " + 
lastClusteringInstant + ". Waiting for "
+          + config.getAsyncClusterMaxCommits());
+      return Option.empty();
+    }
+
     LOG.info("Generating clustering plan for table " + config.getBasePath());
     ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
         ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), 
table, context, config);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 1bf97c6..0d790be 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -206,4 +206,8 @@ public class ClusteringUtils {
     metrics.put(TOTAL_LOG_FILES, (double) numLogFiles);
     return metrics;
   }
+
+  public static List<HoodieInstant> 
getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) {
+    return 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 352a0ca..469f9c7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
@@ -171,6 +172,8 @@ public class DataSourceUtils {
     boolean asyncCompact = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key()));
     boolean inlineCompact = !asyncCompact && 
parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY().key())
         .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+    boolean asyncClusteringEnabled = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key()));
+    boolean inlineClusteringEnabled = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE_OPT_KEY().key()));
     // insert/bulk-insert combining to be true, if filtering for duplicates
     boolean combineInserts = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY().key()));
     HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
@@ -184,6 +187,9 @@ public class DataSourceUtils {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY().key()))
             .withInlineCompaction(inlineCompact).build())
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(inlineClusteringEnabled)
+            .withAsyncClustering(asyncClusteringEnabled).build())
         
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key()))
             .build())
         // override above with Hoodie configs specified as options.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ebb8c6b..ce36831 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -474,6 +474,18 @@ object DataSourceWriteOptions {
     .defaultValue("true")
     .withDocumentation("")
 
+  val INLINE_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.clustering.inline.enable")
+    .defaultValue("false")
+    .sinceVersion("0.9.0")
+    .withDocumentation("Enable inline clustering. Disabled by default.")
+
+  val ASYNC_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.clustering.async.enable")
+    .defaultValue("false")
+    .sinceVersion("0.9.0")
+    .withDocumentation("Enable asynchronous clustering. Disabled by default.")
+
   val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] = 
ConfigProperty
     .key("hoodie.deltastreamer.source.kafka.value.deserializer.class")
     .defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java
new file mode 100644
index 0000000..81d880e
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.HoodieSparkClusteringClient;
+
+/**
+ * Async clustering service for Spark structured streaming.
+ * Here, async clustering is run in daemon mode to prevent blocking shutting 
down the Spark application.
+ */
+public class SparkStreamingAsyncClusteringService extends 
AsyncClusteringService {
+
+  private static final long serialVersionUID = 1L;
+
+  public SparkStreamingAsyncClusteringService(AbstractHoodieWriteClient 
writeClient) {
+    super(writeClient, true);
+  }
+
+  @Override
+  protected AbstractClusteringClient 
createClusteringClient(AbstractHoodieWriteClient client) {
+    return new HoodieSparkClusteringClient(client);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 76bc99b..b290533 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -62,6 +62,7 @@ object HoodieSparkSqlWriter {
   private val log = LogManager.getLogger(getClass)
   private var tableExists: Boolean = false
   private var asyncCompactionTriggerFnDefined: Boolean = false
+  private var asyncClusteringTriggerFnDefined: Boolean = false
 
   def write(sqlContext: SQLContext,
             mode: SaveMode,
@@ -69,9 +70,10 @@ object HoodieSparkSqlWriter {
             df: DataFrame,
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
             hoodieWriteClient: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
-            asyncCompactionTriggerFn: 
Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = 
Option.empty
+            asyncCompactionTriggerFn: 
Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = 
Option.empty,
+            asyncClusteringTriggerFn: 
Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = 
Option.empty
            )
-  : (Boolean, common.util.Option[String], common.util.Option[String],
+  : (Boolean, common.util.Option[String], common.util.Option[String], 
common.util.Option[String],
     SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
 
     val sparkContext = sqlContext.sparkContext
@@ -79,6 +81,7 @@ object HoodieSparkSqlWriter {
     val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
     val tblNameOp = 
hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME, 
s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.")
     asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
+    asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
     if (path.isEmpty) {
       throw new HoodieException(s"'path' must be set.")
     }
@@ -112,7 +115,7 @@ object HoodieSparkSqlWriter {
 
     if (mode == SaveMode.Ignore && tableExists) {
       log.warn(s"hoodie table at $basePath already exists. Ignoring & not 
performing actual writes.")
-      (false, common.util.Option.empty(), common.util.Option.empty(), 
hoodieWriteClient.orNull, tableConfig)
+      (false, common.util.Option.empty(), common.util.Option.empty(), 
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
     } else {
       // Handle various save modes
       handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
@@ -140,7 +143,7 @@ object HoodieSparkSqlWriter {
         operation == WriteOperationType.BULK_INSERT) {
         val (success, commitTime: common.util.Option[String]) = 
bulkInsertAsRow(sqlContext, parameters, df, tblName,
                                                                                
 basePath, path, instantTime)
-        return (success, commitTime, common.util.Option.empty(), 
hoodieWriteClient.orNull, tableConfig)
+        return (success, commitTime, common.util.Option.empty(), 
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
       }
       // scalastyle:on
 
@@ -180,6 +183,10 @@ object HoodieSparkSqlWriter {
             asyncCompactionTriggerFn.get.apply(client)
           }
 
+          if (isAsyncClusteringEnabled(client, parameters)) {
+            asyncClusteringTriggerFn.get.apply(client)
+          }
+
           val hoodieRecords =
             if (hoodieConfig.getBoolean(INSERT_DROP_DUPS_OPT_KEY)) {
               DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, 
mapAsJavaMap(parameters))
@@ -219,6 +226,10 @@ object HoodieSparkSqlWriter {
             asyncCompactionTriggerFn.get.apply(client)
           }
 
+          if (isAsyncClusteringEnabled(client, parameters)) {
+            asyncClusteringTriggerFn.get.apply(client)
+          }
+
           // Issue deletes
           client.startCommitWithTime(instantTime, commitActionType)
           val writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysToDelete, instantTime)
@@ -226,7 +237,7 @@ object HoodieSparkSqlWriter {
         }
 
       // Check for errors and commit the write.
-      val (writeSuccessful, compactionInstant) =
+      val (writeSuccessful, compactionInstant, clusteringInstant) =
         commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
           writeResult, parameters, writeClient, tableConfig, jsc,
           TableInstantInfo(basePath, instantTime, commitActionType, operation))
@@ -247,7 +258,7 @@ object HoodieSparkSqlWriter {
       // it's safe to unpersist cached rdds here
       unpersistRdd(writeResult.getWriteStatuses.rdd)
 
-      (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, writeClient, tableConfig)
+      (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, clusteringInstant, writeClient, tableConfig)
     }
   }
 
@@ -565,7 +576,7 @@ object HoodieSparkSqlWriter {
                                              tableConfig: HoodieTableConfig,
                                              jsc: JavaSparkContext,
                                              tableInstantInfo: TableInstantInfo
-                                             ): (Boolean, 
common.util.Option[java.lang.String]) = {
+                                             ): (Boolean, 
common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
     if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
       log.info("Proceeding to commit the write.")
       val metaMap = parameters.filter(kv =>
@@ -593,14 +604,24 @@ object HoodieSparkSqlWriter {
 
       log.info(s"Compaction Scheduled is $compactionInstant")
 
+      val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
+      val clusteringInstant: common.util.Option[java.lang.String] =
+        if (asyncClusteringEnabled) {
+          client.scheduleClustering(common.util.Option.of(new 
util.HashMap[String, String](mapAsJavaMap(metaMap))))
+        } else {
+          common.util.Option.empty()
+        }
+
+      log.info(s"Clustering Scheduled is $clusteringInstant")
+
       val metaSyncSuccess = metaSync(spark, 
HoodieWriterUtils.convertMapToHoodieConfig(parameters),
         tableInstantInfo.basePath, schema)
 
       log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
-      if (!asyncCompactionEnabled) {
+      if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
         client.close()
       }
-      (commitSuccess && metaSyncSuccess, compactionInstant)
+      (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
     } else {
       log.error(s"${tableInstantInfo.operation} failed with errors")
       if (log.isTraceEnabled) {
@@ -615,7 +636,7 @@ object HoodieSparkSqlWriter {
             }
           })
       }
-      (false, common.util.Option.empty())
+      (false, common.util.Option.empty(), common.util.Option.empty())
     }
   }
 
@@ -631,6 +652,13 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  private def isAsyncClusteringEnabled(client: 
SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
+                                       parameters: Map[String, String]) : 
Boolean = {
+    log.info(s"Config.asyncClusteringEnabled ? 
${client.getConfig.isAsyncClusteringEnabled}")
+    asyncClusteringTriggerFnDefined && 
client.getConfig.isAsyncClusteringEnabled &&
+      parameters.get(ASYNC_CLUSTERING_ENABLE_OPT_KEY.key).exists(r => 
r.toBoolean)
+  }
+
   private def getHoodieTableConfig(sparkContext: SparkContext,
                                    tablePath: String,
                                    hoodieTableConfigOpt: 
Option[HoodieTableConfig]): HoodieTableConfig = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 61cad38..8dfcbc4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 import java.lang
 import java.util.function.Function
 
-import org.apache.hudi.async.{AsyncCompactService, 
SparkStreamingAsyncCompactService}
+import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, 
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.model.HoodieRecordPayload
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient}
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.util.CompactionUtils
+import org.apache.hudi.common.util.ClusteringUtils
 import org.apache.hudi.exception.HoodieCorruptedDataException
 import org.apache.log4j.LogManager
 import org.apache.spark.api.java.JavaSparkContext
@@ -52,6 +53,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
   private val ignoreFailedBatch = 
options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key).toBoolean
 
   private var isAsyncCompactorServiceShutdownAbnormally = false
+  private var isAsyncClusteringServiceShutdownAbnormally = false
 
   private val mode =
     if (outputMode == OutputMode.Append()) {
@@ -61,6 +63,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     }
 
   private var asyncCompactorService : AsyncCompactService = _
+  private var asyncClusteringService: AsyncClusteringService = _
   private var writeClient : 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
   private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
 
@@ -68,13 +71,17 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     if (isAsyncCompactorServiceShutdownAbnormally)  {
       throw new IllegalStateException("Async Compactor shutdown unexpectedly")
     }
+    if (isAsyncClusteringServiceShutdownAbnormally)  {
+      log.error("Async clustering service shutdown unexpectedly")
+      throw new IllegalStateException("Async clustering service shutdown 
unexpectedly")
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
-          sqlContext, mode, options, data, hoodieTableConfig, writeClient, 
Some(triggerAsyncCompactor))
+          sqlContext, mode, options, data, hoodieTableConfig, writeClient, 
Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
       ) match {
-        case Success((true, commitOps, compactionInstantOps, client, 
tableConfig)) =>
+        case Success((true, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
           log.info(s"Micro batch id=$batchId succeeded"
             + (commitOps.isPresent match {
                 case true => s" for commit=${commitOps.get()}"
@@ -83,9 +90,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
           if (compactionInstantOps.isPresent) {
-            asyncCompactorService.enqueuePendingCompaction(
+            asyncCompactorService.enqueuePendingAsyncServiceInstant(
               new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
           }
+          if (clusteringInstant.isPresent) {
+            asyncClusteringService.enqueuePendingAsyncServiceInstant(new 
HoodieInstant(
+              State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, 
clusteringInstant.get()
+            ))
+          }
           Success((true, commitOps, compactionInstantOps))
         case Failure(e) =>
           // clean up persist rdds in the write process
@@ -107,7 +119,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
             if (retryCnt > 1) log.info(s"Retrying the failed micro batch 
id=$batchId ...")
             Failure(e)
           }
-        case Success((false, commitOps, compactionInstantOps, client, 
tableConfig)) =>
+        case Success((false, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
           log.error(s"Micro batch id=$batchId ended up with errors"
             + (commitOps.isPresent match {
               case true =>  s" for commit=${commitOps.get()}"
@@ -179,7 +191,33 @@ class HoodieStreamingSink(sqlContext: SQLContext,
         .setBasePath(client.getConfig.getBasePath).build()
       val pendingInstants :java.util.List[HoodieInstant] =
         CompactionUtils.getPendingCompactionInstantTimes(metaClient)
-      pendingInstants.foreach((h : HoodieInstant) => 
asyncCompactorService.enqueuePendingCompaction(h))
+      pendingInstants.foreach((h : HoodieInstant) => 
asyncCompactorService.enqueuePendingAsyncServiceInstant(h))
+    }
+  }
+
+  protected def triggerAsyncClustering(client: 
SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+    if (null ==  asyncClusteringService) {
+      log.info("Triggering async clustering!")
+      asyncClusteringService = new SparkStreamingAsyncClusteringService(client)
+      asyncClusteringService.start(new Function[java.lang.Boolean, 
java.lang.Boolean] {
+        override def apply(errored: lang.Boolean): lang.Boolean = {
+          log.info(s"Async clustering service shutdown. Errored ? $errored")
+          isAsyncClusteringServiceShutdownAbnormally = errored
+          reset(false)
+          true
+        }
+      })
+
+      // Add Shutdown Hook
+      Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+        override def run(): Unit = reset(true)
+      }))
+
+      // First time, scan .hoodie folder and get all pending clustering 
instants
+      val metaClient = 
HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration)
+        .setBasePath(client.getConfig.getBasePath).build()
+      val pendingInstants :java.util.List[HoodieInstant] = 
ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
+      pendingInstants.foreach((h : HoodieInstant) => 
asyncClusteringService.enqueuePendingAsyncServiceInstant(h))
     }
   }
 
@@ -189,6 +227,11 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       asyncCompactorService = null
     }
 
+    if (asyncClusteringService != null) {
+      asyncClusteringService.shutdown(force)
+      asyncClusteringService = null
+    }
+
     if (writeClient.isDefined) {
       writeClient.get.close()
       writeClient = Option.empty
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 586e916..3056103 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -76,6 +76,8 @@ object HoodieWriterUtils {
       HIVE_CREATE_MANAGED_TABLE.key() -> 
HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
       HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> 
HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
       ASYNC_COMPACT_ENABLE_OPT_KEY.key -> 
ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue,
+      INLINE_CLUSTERING_ENABLE_OPT_KEY.key -> 
INLINE_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
+      ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> 
ASYNC_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
       ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue
     ) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index d086c2e..966ffb0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -156,6 +156,7 @@ public class HoodieJavaApp {
             nonPartitionedTable ? 
NonpartitionedKeyGenerator.class.getCanonicalName()
                 : SimpleKeyGenerator.class.getCanonicalName())
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), 
"false")
+        
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
         // This will remove any existing data at path below, and create a
         .mode(SaveMode.Overwrite);
 
@@ -183,6 +184,7 @@ public class HoodieJavaApp {
                 : SimpleKeyGenerator.class.getCanonicalName()) // Add Key 
Extractor
         
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), 
"false")
+        
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
         .option(HoodieWriteConfig.TABLE_NAME.key(), 
tableName).mode(SaveMode.Append);
 
     updateHiveSyncConfig(writer);
@@ -210,6 +212,7 @@ public class HoodieJavaApp {
                 : SimpleKeyGenerator.class.getCanonicalName()) // Add Key 
Extractor
         
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), 
"false")
+        
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
         .option(HoodieWriteConfig.TABLE_NAME.key(), 
tableName).mode(SaveMode.Append);
 
     updateHiveSyncConfig(writer);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index b0f0631..75fa91e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -362,6 +362,7 @@ public class HoodieJavaStreamingApp {
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key(), 
"timestamp")
         
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(), 
"true")
+        
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
         .option(HoodieWriteConfig.TABLE_NAME.key(), 
tableName).option("checkpointLocation", checkpointLocation)
         .outputMode(OutputMode.Append());
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 4203882..66cb1ca 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -190,9 +190,13 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
     numInstants
   }
 
-  def getInlineClusteringOpts( isInlineClustering: String, 
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
+  def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, 
isAsyncCompaction: String,
+                        clusteringNumCommit: String, fileMaxRecordNum: 
Int):Map[String, String] = {
     commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key -> 
isInlineClustering,
       HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key -> 
clusteringNumCommit,
+      DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key -> 
isAsyncClustering,
+      DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY.key -> 
isAsyncCompaction,
+      HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key -> 
clusteringNumCommit,
       HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key -> 
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
     )
   }
@@ -207,12 +211,40 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
       metaClient.reloadActiveTimeline()
       assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
     }
-    structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, 
false, false,
       HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
   }
 
   @Test
-  def testStructuredStreamingWithoutInlineClustering(): Unit = {
+  def testStructuredStreamingWithAsyncClustering(): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
+
+    def checkClusteringResult(destPath: String):Unit = {
+      // check have schedule clustering and clustering file group to one
+      waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+      metaClient.reloadActiveTimeline()
+      assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
+    }
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, 
true, false,
+      HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
+  }
+
+  @Test
+  def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
+
+    def checkClusteringResult(destPath: String):Unit = {
+      // check have schedule clustering and clustering file group to one
+      waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+      metaClient.reloadActiveTimeline()
+      assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
+    }
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, 
true, true,
+      HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
+  }
+
+  @Test
+  def testStructuredStreamingWithoutClustering(): Unit = {
     val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
 
     def checkClusteringResult(destPath: String):Unit = {
@@ -224,12 +256,13 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
       }, msg)
       println(msg)
     }
-    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, 
false, false,
       HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
   }
 
-  def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String,
-                                           isInlineClustering: Boolean, 
partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
+  def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String, isInlineClustering: Boolean,
+                                                 isAsyncClustering: Boolean, 
isAsyncCompaction: Boolean,
+                                                 partitionOfRecords: String, 
checkClusteringResult: String => Unit): Unit = {
     // First insert of data
     val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 
100, partitionOfRecords)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -238,7 +271,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
100, partitionOfRecords)).toList
     val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
 
-    val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, 
"2", 100)
+    val hudiOptions = getClusteringOpts(isInlineClustering.toString, 
isAsyncClustering.toString,
+      isAsyncCompaction.toString, "2", 100)
     val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
hudiOptions)
 
     val f2 = Future {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 2aad344..7742e8e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -91,8 +92,10 @@ import java.util.stream.Collectors;
 import scala.collection.JavaConversions;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
+import static 
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
+import static 
org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP;
 import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
 import static 
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP;
 import static 
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP;
@@ -645,6 +648,9 @@ public class DeltaSync implements Serializable {
             
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
                 // Inline compaction is disabled for continuous mode. 
otherwise enabled for MOR
                 .withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
+            .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+                .withInlineClustering(cfg.isInlineClusteringEnabled())
+                .withAsyncClustering(cfg.isAsyncClusteringEnabled()).build())
             
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
                 .build())
             .forTable(cfg.targetTableName)
@@ -663,6 +669,10 @@ public class DeltaSync implements Serializable {
     // Validate what deltastreamer assumes of write-config to be really safe
     ValidationUtils.checkArgument(config.inlineCompactionEnabled() == 
cfg.isInlineCompactionEnabled(),
         String.format("%s should be set to %s", INLINE_COMPACT_PROP.key(), 
cfg.isInlineCompactionEnabled()));
+    ValidationUtils.checkArgument(config.inlineClusteringEnabled() == 
cfg.isInlineClusteringEnabled(),
+        String.format("%s should be set to %s", INLINE_CLUSTERING_PROP.key(), 
cfg.isInlineClusteringEnabled()));
+    ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == 
cfg.isAsyncClusteringEnabled(),
+        String.format("%s should be set to %s", 
ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), cfg.isAsyncClusteringEnabled()));
     ValidationUtils.checkArgument(!config.shouldAutoCommit(),
         String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP.key(), 
autoCommit));
     ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == 
cfg.filterDupes,
@@ -736,4 +746,14 @@ public class DeltaSync implements Serializable {
   public Option<HoodieTimeline> getCommitTimelineOpt() {
     return commitTimelineOpt;
   }
+
+  /**
+   * Schedule clustering.
+   * Called from {@link HoodieDeltaStreamer} when async clustering is enabled.
+   *
+   * @return Requested clustering instant.
+   */
+  public Option<String> getClusteringInstantOpt() {
+    return writeClient.scheduleClustering(Option.empty());
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a1066c7..9734a1d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -18,8 +18,10 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
-import org.apache.hudi.async.HoodieAsyncService;
+import org.apache.hudi.async.AsyncClusteringService;
 import org.apache.hudi.async.AsyncCompactService;
+import org.apache.hudi.async.HoodieAsyncService;
+import org.apache.hudi.async.SparkAsyncClusteringService;
 import org.apache.hudi.async.SparkAsyncCompactService;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -35,15 +37,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.utilities.HiveIncrementalPuller;
+import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -282,6 +286,11 @@ public class HoodieDeltaStreamer implements Serializable {
             + "outstanding compactions is less than this number")
     public Integer maxPendingCompactions = 5;
 
+    @Parameter(names = {"--max-pending-clustering"},
+        description = "Maximum number of outstanding inflight/requested 
clustering. Delta Sync will not happen unless"
+            + "outstanding clustering is less than this number")
+    public Integer maxPendingClustering = 5;
+
     @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in 
continuous mode running"
         + " source-fetch -> Transform -> Hudi Write in loop")
     public Boolean continuousMode = false;
@@ -351,6 +360,16 @@ public class HoodieDeltaStreamer implements Serializable {
           && 
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
     }
 
+    public boolean isAsyncClusteringEnabled() {
+      return 
Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig()
+          
.getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), 
false)));
+    }
+
+    public boolean isInlineClusteringEnabled() {
+      return 
Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig()
+          .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), 
false)));
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -376,6 +395,7 @@ public class HoodieDeltaStreamer implements Serializable {
               && Objects.equals(filterDupes, config.filterDupes)
               && Objects.equals(enableHiveSync, config.enableHiveSync)
               && Objects.equals(maxPendingCompactions, 
config.maxPendingCompactions)
+              && Objects.equals(maxPendingClustering, 
config.maxPendingClustering)
               && Objects.equals(continuousMode, config.continuousMode)
               && Objects.equals(minSyncIntervalSeconds, 
config.minSyncIntervalSeconds)
               && Objects.equals(sparkMaster, config.sparkMaster)
@@ -396,7 +416,7 @@ public class HoodieDeltaStreamer implements Serializable {
               baseFileFormat, propsFilePath, configs, sourceClassName,
               sourceOrderingField, payloadClassName, schemaProviderClassName,
               transformerClassNames, sourceLimit, operation, filterDupes,
-              enableHiveSync, maxPendingCompactions, continuousMode,
+              enableHiveSync, maxPendingCompactions, maxPendingClustering, 
continuousMode,
               minSyncIntervalSeconds, sparkMaster, commitOnErrors,
               deltaSyncSchedulingWeight, compactSchedulingWeight, 
deltaSyncSchedulingMinShare,
               compactSchedulingMinShare, forceDisableCompaction, checkpoint,
@@ -422,6 +442,7 @@ public class HoodieDeltaStreamer implements Serializable {
               + ", filterDupes=" + filterDupes
               + ", enableHiveSync=" + enableHiveSync
               + ", maxPendingCompactions=" + maxPendingCompactions
+              + ", maxPendingClustering=" + maxPendingClustering
               + ", continuousMode=" + continuousMode
               + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds
               + ", sparkMaster='" + sparkMaster + '\''
@@ -520,6 +541,11 @@ public class HoodieDeltaStreamer implements Serializable {
     private Option<AsyncCompactService> asyncCompactService;
 
     /**
+     * Async clustering service.
+     */
+    private Option<AsyncClusteringService> asyncClusteringService;
+
+    /**
      * Table Type.
      */
     private final HoodieTableType tableType;
@@ -535,6 +561,7 @@ public class HoodieDeltaStreamer implements Serializable {
       this.jssc = jssc;
       this.sparkSession = 
SparkSession.builder().config(jssc.getConf()).getOrCreate();
       this.asyncCompactService = Option.empty();
+      this.asyncClusteringService = Option.empty();
 
       if (fs.exists(new Path(cfg.targetBasePath))) {
         HoodieTableMetaClient meta =
@@ -598,9 +625,17 @@ public class HoodieDeltaStreamer implements Serializable {
               Option<Pair<Option<String>, JavaRDD<WriteStatus>>> 
scheduledCompactionInstantAndRDD = Option.ofNullable(deltaSync.syncOnce());
               if (scheduledCompactionInstantAndRDD.isPresent() && 
scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) {
                 LOG.info("Enqueuing new pending compaction instant (" + 
scheduledCompactionInstantAndRDD.get().getLeft() + ")");
-                asyncCompactService.get().enqueuePendingCompaction(new 
HoodieInstant(State.REQUESTED,
+                
asyncCompactService.get().enqueuePendingAsyncServiceInstant(new 
HoodieInstant(State.REQUESTED,
                     HoodieTimeline.COMPACTION_ACTION, 
scheduledCompactionInstantAndRDD.get().getLeft().get()));
-                
asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
+                
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
+              }
+              if (cfg.isAsyncClusteringEnabled()) {
+                Option<String> clusteringInstant = 
deltaSync.getClusteringInstantOpt();
+                if (clusteringInstant.isPresent()) {
+                  LOG.info("Scheduled async clustering for instant: " + 
clusteringInstant.get());
+                  
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new 
HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, 
clusteringInstant.get()));
+                  
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+                }
               }
               long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
               if (toSleepMs > 0) {
@@ -615,21 +650,25 @@ public class HoodieDeltaStreamer implements Serializable {
             }
           }
         } finally {
-          shutdownCompactor(error);
+          shutdownAsyncServices(error);
         }
         return true;
       }, executor), executor);
     }
 
     /**
-     * Shutdown compactor as DeltaSync is shutdown.
+     * Shutdown async services like compaction/clustering as DeltaSync is 
shutdown.
      */
-    private void shutdownCompactor(boolean error) {
+    private void shutdownAsyncServices(boolean error) {
       LOG.info("Delta Sync shutdown. Error ?" + error);
       if (asyncCompactService.isPresent()) {
         LOG.warn("Gracefully shutting down compactor");
         asyncCompactService.get().shutdown(false);
       }
+      if (asyncClusteringService.isPresent()) {
+        LOG.warn("Gracefully shutting down clustering service");
+        asyncClusteringService.get().shutdown(false);
+      }
     }
 
     /**
@@ -649,19 +688,43 @@ public class HoodieDeltaStreamer implements Serializable {
           HoodieTableMetaClient meta =
               HoodieTableMetaClient.builder().setConf(new 
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
           List<HoodieInstant> pending = 
CompactionUtils.getPendingCompactionInstantTimes(meta);
-          pending.forEach(hoodieInstant -> 
asyncCompactService.get().enqueuePendingCompaction(hoodieInstant));
+          pending.forEach(hoodieInstant -> 
asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
           asyncCompactService.get().start((error) -> {
             // Shutdown DeltaSync
             shutdown(false);
             return true;
           });
           try {
-            
asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
+            
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
           } catch (InterruptedException ie) {
             throw new HoodieException(ie);
           }
         }
       }
+      // start async clustering if required
+      if (cfg.isAsyncClusteringEnabled()) {
+        if (asyncClusteringService.isPresent()) {
+          asyncClusteringService.get().updateWriteClient(writeClient);
+        } else {
+          asyncClusteringService = Option.ofNullable(new 
SparkAsyncClusteringService(writeClient));
+          HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
+              .setConf(new Configuration(jssc.hadoopConfiguration()))
+              .setBasePath(cfg.targetBasePath)
+              .setLoadActiveTimelineOnLoad(true).build();
+          List<HoodieInstant> pending = 
ClusteringUtils.getPendingClusteringInstantTimes(meta);
+          LOG.info(String.format("Found %d pending clustering instants ", 
pending.size()));
+          pending.forEach(hoodieInstant -> 
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
+          asyncClusteringService.get().start((error) -> {
+            shutdown(false);
+            return true;
+          });
+          try {
+            
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+          } catch (InterruptedException e) {
+            throw new HoodieException(e);
+          }
+        }
+      }
       return true;
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index eb534c4..ddf03cb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -193,6 +193,7 @@ public class HoodieMultiTableDeltaStreamer {
       tableConfig.payloadClassName = globalConfig.payloadClassName;
       tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction;
       tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions;
+      tableConfig.maxPendingClustering = globalConfig.maxPendingClustering;
       tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds;
       tableConfig.transformerClassNames = globalConfig.transformerClassNames;
       tableConfig.commitOnErrors = globalConfig.commitOnErrors;
@@ -296,6 +297,11 @@ public class HoodieMultiTableDeltaStreamer {
         + "outstanding compactions is less than this number")
     public Integer maxPendingCompactions = 5;
 
+    @Parameter(names = {"--max-pending-clustering"},
+        description = "Maximum number of outstanding inflight/requested 
clustering. Delta Sync will not happen unless"
+            + "outstanding clustering is less than this number")
+    public Integer maxPendingClustering = 5;
+
     @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in 
continuous mode running"
         + " source-fetch -> Transform -> Hudi Write in loop")
     public Boolean continuousMode = false;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 8a2648d..642c666 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -500,6 +501,14 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       int numDeltaCommits = (int) timeline.getInstants().count();
       assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
     }
+
+    static void assertAtLeastNReplaceCommits(int minExpected, String 
tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
+      LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      int numDeltaCommits = (int) timeline.getInstants().count();
+      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
+    }
   }
 
   @Test
@@ -987,20 +996,35 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
-    cfg.configs.add(String.format("%s=%d", 
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfg.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
-    cfg.configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), "true"));
-    cfg.configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), "2"));
+    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
-      int pendingReplaceSize = 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
-      int completeReplaceSize = 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
-      LOG.info("PendingReplaceSize=" + pendingReplaceSize + 
",completeReplaceSize = " + completeReplaceSize);
-      return completeReplaceSize > 0;
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+      return true;
     });
-    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
-    assertEquals(1, 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
+  }
+
+  private List<String> getAsyncServicesConfigs(int totalRecords, String 
autoClean, String inlineCluster,
+                                               String inlineClusterMaxCommit, 
String asyncCluster, String asyncClusterMaxCommit) {
+    List<String> configs = new ArrayList<>();
+    configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, 
totalRecords));
+    if (!StringUtils.isNullOrEmpty(autoClean)) {
+      configs.add(String.format("%s=%s", 
HoodieCompactionConfig.AUTO_CLEAN_PROP.key(), autoClean));
+    }
+    if (!StringUtils.isNullOrEmpty(inlineCluster)) {
+      configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), inlineCluster));
+    }
+    if (!StringUtils.isNullOrEmpty(inlineClusterMaxCommit)) {
+      configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), 
inlineClusterMaxCommit));
+    }
+    if (!StringUtils.isNullOrEmpty(asyncCluster)) {
+      configs.add(String.format("%s=%s", 
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), asyncCluster));
+    }
+    if (!StringUtils.isNullOrEmpty(asyncClusterMaxCommit)) {
+      configs.add(String.format("%s=%s", 
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key(), 
asyncClusterMaxCommit));
+    }
+    return configs;
   }
 
   private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String 
basePath,
@@ -1023,9 +1047,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
-    cfg.configs.add(String.format("%s=%d", 
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfg.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
-    cfg.configs.add(String.format("%s=true", 
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key()));
+    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", ""));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
@@ -1049,14 +1071,48 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       } else {
         LOG.warn("Schedule clustering failed");
       }
-      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
-      int pendingReplaceSize = 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
-      int completeReplaceSize = 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
-      System.out.println("PendingReplaceSize=" + pendingReplaceSize + 
",completeReplaceSize = " + completeReplaceSize);
-      return completeReplaceSize > 0;
+      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+      return true;
+    });
+  }
+
+  @Test
+  public void testAsyncClusteringService() throws Exception {
+    String tableBasePath = dfsBasePath + "/asyncClustering";
+    // Keep it higher than batch-size to test continuous mode
+    int totalRecords = 3000;
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
+    cfg.continuousMode = true;
+    cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "2"));
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    deltaStreamerTestRunner(ds, cfg, (r) -> {
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+      return true;
+    });
+  }
+
+  @Test
+  public void testAsyncClusteringServiceWithCompaction() throws Exception {
+    String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
+    // Keep it higher than batch-size to test continuous mode
+    int totalRecords = 3000;
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
+    cfg.continuousMode = true;
+    cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "2"));
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    deltaStreamerTestRunner(ds, cfg, (r) -> {
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+      return true;
     });
-    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
-    assertEquals(1, 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
   }
 
   /**

Reply via email to