This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8919be6  [HUDI-855] Run Cleaner async with writing (#1577)
8919be6 is described below

commit 8919be6a5d8038db7265bfd7459d72fbd545f133
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Sun Jun 28 02:04:50 2020 -0700

    [HUDI-855] Run Cleaner async with writing (#1577)
    
    - Cleaner can now run concurrently with write operation
    - Configs to turn on/off
    
    Co-authored-by: Vinoth Chandar <[email protected]>
---
 .../hudi/cli/commands/TestCleansCommand.java       |  28 +++---
 .../apache/hudi/cli/integ/ITTestCleansCommand.java | 106 ---------------------
 .../HoodieTestCommitMetadataGenerator.java         |  20 +++-
 .../apache/hudi/async/AbstractAsyncService.java    |  22 +++--
 .../apache/hudi/client/AsyncCleanerService.java    |  85 +++++++++++++++++
 .../org/apache/hudi/client/HoodieWriteClient.java  |  50 +++++++---
 .../apache/hudi/config/HoodieCompactionConfig.java |  10 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../table/action/clean/CleanActionExecutor.java    |   5 +-
 .../java/org/apache/hudi/table/TestCleaner.java    |  50 +++++-----
 .../deltastreamer/HoodieDeltaStreamer.java         |   5 +-
 11 files changed, 207 insertions(+), 178 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
index 69aa5b3..c14cf0b 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
@@ -26,7 +26,6 @@ import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
 import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -36,6 +35,8 @@ import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.shell.core.CommandResult;
@@ -43,11 +44,10 @@ import org.springframework.shell.core.CommandResult;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -77,6 +77,10 @@ public class TestCleansCommand extends 
AbstractShellIntegrationTest {
     Configuration conf = HoodieCLI.conf;
 
     metaClient = HoodieCLI.getTableMetaClient();
+    String fileId1 = UUID.randomUUID().toString();
+    String fileId2 = UUID.randomUUID().toString();
+    HoodieTestDataGenerator.writePartitionMetadata(fs, 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath);
+
     // Create four commits
     for (int i = 100; i < 104; i++) {
       String timestamp = String.valueOf(i);
@@ -86,7 +90,8 @@ public class TestCleansCommand extends 
AbstractShellIntegrationTest {
       // Inflight Compaction
       
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
           new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
-      
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, 
timestamp, conf);
+      
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, 
timestamp, conf, fileId1, fileId2,
+          Option.empty(), Option.empty());
     }
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -103,9 +108,6 @@ public class TestCleansCommand extends 
AbstractShellIntegrationTest {
     assertNotNull(propsFilePath, "Not found properties file");
 
     // First, run clean
-    Files.createFile(Paths.get(tablePath,
-        HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
-        HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
     SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.getPath(), new 
ArrayList<>());
     assertEquals(1, 
metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
         "Loaded 1 clean and the count should match");
@@ -125,7 +127,7 @@ public class TestCleansCommand extends 
AbstractShellIntegrationTest {
 
     // EarliestCommandRetained should be 102, since 
hoodie.cleaner.commits.retained=2
     // Total Time Taken need read from metadata
-    rows.add(new Comparable[] {clean.getTimestamp(), "102", "0", 
getLatestCleanTimeTakenInMillis().toString()});
+    rows.add(new Comparable[] {clean.getTimestamp(), "102", "2", 
getLatestCleanTimeTakenInMillis().toString()});
 
     String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", 
false, -1, false, rows);
     expected = removeNonWordAndStripSpace(expected);
@@ -142,12 +144,6 @@ public class TestCleansCommand extends 
AbstractShellIntegrationTest {
     assertNotNull(propsFilePath, "Not found properties file");
 
     // First, run clean with two partition
-    Files.createFile(Paths.get(tablePath,
-        HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
-        HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-    Files.createFile(Paths.get(tablePath,
-        HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
-        HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
     SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.toString(), new 
ArrayList<>());
     assertEquals(1, 
metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
         "Loaded 1 clean and the count should match");
@@ -165,9 +161,11 @@ public class TestCleansCommand extends 
AbstractShellIntegrationTest {
     // There should be two partition path
     List<Comparable[]> rows = new ArrayList<>();
     rows.add(new Comparable[] 
{HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"});
+    rows.add(new Comparable[] 
{HoodieTestCommitMetadataGenerator.DEFAULT_THIRD_PARTITION_PATH,
         HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
     rows.add(new Comparable[] 
{HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
-        HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"});
 
     String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", 
false, -1, false, rows);
     expected = removeNonWordAndStripSpace(expected);
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java
deleted file mode 100644
index 1f6f6c7..0000000
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.integ;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.commands.TableCommand;
-import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
-import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.shell.core.CommandResult;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class ITTestCleansCommand extends AbstractShellIntegrationTest {
-
-  private String tablePath;
-  private URL propsFilePath;
-
-  @BeforeEach
-  public void init() throws IOException {
-    HoodieCLI.conf = jsc.hadoopConfiguration();
-
-    String tableName = "test_table";
-    tablePath = basePath + File.separator + tableName;
-    propsFilePath = 
this.getClass().getClassLoader().getResource("clean.properties");
-
-    // Create table and connect
-    new TableCommand().createTable(
-        tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
-        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
-
-    Configuration conf = HoodieCLI.conf;
-
-    metaClient = HoodieCLI.getTableMetaClient();
-    // Create four commits
-    for (int i = 100; i < 104; i++) {
-      String timestamp = String.valueOf(i);
-      // Requested Compaction
-      
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
-          new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
-      // Inflight Compaction
-      
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
-          new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
-      
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, 
timestamp, conf);
-    }
-  }
-
-  /**
-   * Test case for cleans run.
-   */
-  @Test
-  public void testRunClean() throws IOException {
-    // First, there should none of clean instant.
-    assertEquals(0, 
metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count());
-
-    // Check properties file exists.
-    assertNotNull(propsFilePath, "Not found properties file");
-
-    // Create partition metadata
-    Files.createFile(Paths.get(tablePath,
-        HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
-        HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-    Files.createFile(Paths.get(tablePath,
-        HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
-        HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-
-    CommandResult cr = getShell().executeCommand("cleans run --sparkMaster 
local --propsFilePath " + propsFilePath.toString());
-    assertTrue(cr.isSuccess());
-
-    // After run clean, there should have 1 clean instant
-    assertEquals(1, 
metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
-        "Loaded 1 clean and the count should match");
-  }
-}
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
index bdf623e..94904c5 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.cli.testutils;
 
+import java.util.UUID;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -67,6 +68,12 @@ public class HoodieTestCommitMetadataGenerator extends 
HoodieTestDataGenerator {
 
   public static void createCommitFileWithMetadata(String basePath, String 
commitTime, Configuration configuration,
       Option<Integer> writes, Option<Integer> updates) {
+    createCommitFileWithMetadata(basePath, commitTime, configuration, 
UUID.randomUUID().toString(),
+        UUID.randomUUID().toString(), writes, updates);
+  }
+
+  public static void createCommitFileWithMetadata(String basePath, String 
commitTime, Configuration configuration,
+      String fileId1, String fileId2, Option<Integer> writes, Option<Integer> 
updates) {
     Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), 
HoodieTimeline.makeInflightCommitFileName(commitTime),
         HoodieTimeline.makeRequestedCommitFileName(commitTime))
         .forEach(f -> {
@@ -77,7 +84,8 @@ public class HoodieTestCommitMetadataGenerator extends 
HoodieTestDataGenerator {
             FileSystem fs = FSUtils.getFs(basePath, configuration);
             os = fs.create(commitFile, true);
             // Generate commitMetadata
-            HoodieCommitMetadata commitMetadata = 
generateCommitMetadata(basePath, commitTime, writes, updates);
+            HoodieCommitMetadata commitMetadata =
+                generateCommitMetadata(basePath, commitTime, fileId1, fileId2, 
writes, updates);
             // Write empty commit metadata
             os.writeBytes(new 
String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
           } catch (IOException ioe) {
@@ -103,8 +111,14 @@ public class HoodieTestCommitMetadataGenerator extends 
HoodieTestDataGenerator {
 
   public static HoodieCommitMetadata generateCommitMetadata(String basePath, 
String commitTime,
       Option<Integer> writes, Option<Integer> updates) throws IOException {
-    String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, 
DEFAULT_FIRST_PARTITION_PATH, commitTime);
-    String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, 
DEFAULT_SECOND_PARTITION_PATH, commitTime);
+    return generateCommitMetadata(basePath, commitTime, 
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
+        writes, updates);
+  }
+
+  public static HoodieCommitMetadata generateCommitMetadata(String basePath, 
String commitTime, String fileId1,
+      String fileId2, Option<Integer> writes, Option<Integer> updates) throws 
IOException {
+    String file1P0C0 = HoodieTestUtils.createDataFile(basePath, 
DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1);
+    String file1P1C0 = HoodieTestUtils.createDataFile(basePath, 
DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2);
     return generateCommitMetadata(new HashMap<String, List<String>>() {
       {
         put(DEFAULT_FIRST_PARTITION_PATH, 
CollectionUtils.createImmutableList(file1P0C0));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
 b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
similarity index 88%
rename from 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
rename to 
hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
index 8fe1a71..7ac236d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.deltastreamer;
+package org.apache.hudi.async;
 
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 /**
- * Base Class for running delta-sync/compaction in separate thread and 
controlling their life-cycle.
+ * Base Class for running clean/delta-sync/compaction in separate thread and 
controlling their life-cycle.
  */
-public abstract class AbstractDeltaStreamerService implements Serializable {
+public abstract class AbstractAsyncService implements Serializable {
 
-  private static final Logger LOG = 
LogManager.getLogger(AbstractDeltaStreamerService.class);
+  private static final Logger LOG = 
LogManager.getLogger(AbstractAsyncService.class);
 
   // Flag to track if the service is started.
   private boolean started;
@@ -49,15 +49,15 @@ public abstract class AbstractDeltaStreamerService 
implements Serializable {
   // Future tracking delta-sync/compaction
   private transient CompletableFuture future;
 
-  AbstractDeltaStreamerService() {
+  protected AbstractAsyncService() {
     shutdownRequested = false;
   }
 
-  boolean isShutdownRequested() {
+  protected boolean isShutdownRequested() {
     return shutdownRequested;
   }
 
-  boolean isShutdown() {
+  protected boolean isShutdown() {
     return shutdown;
   }
 
@@ -67,7 +67,7 @@ public abstract class AbstractDeltaStreamerService implements 
Serializable {
    * @throws ExecutionException
    * @throws InterruptedException
    */
-  void waitForShutdown() throws ExecutionException, InterruptedException {
+  public void waitForShutdown() throws ExecutionException, 
InterruptedException {
     try {
       future.get();
     } catch (ExecutionException ex) {
@@ -82,7 +82,7 @@ public abstract class AbstractDeltaStreamerService implements 
Serializable {
    * 
    * @param force Forcefully shutdown
    */
-  void shutdown(boolean force) {
+  public void shutdown(boolean force) {
     if (!shutdownRequested || force) {
       shutdownRequested = true;
       if (executor != null) {
@@ -145,7 +145,9 @@ public abstract class AbstractDeltaStreamerService 
implements Serializable {
       } finally {
         // Mark as shutdown
         shutdown = true;
-        onShutdownCallback.apply(error);
+        if (null != onShutdownCallback) {
+          onShutdownCallback.apply(error);
+        }
       }
     });
   }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java 
b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
new file mode 100644
index 0000000..6367e79
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Clean service running concurrently with write operation.
+ */
+class AsyncCleanerService extends AbstractAsyncService {
+
+  private static final Logger LOG = 
LogManager.getLogger(AsyncCleanerService.class);
+
+  private final HoodieWriteClient<?> writeClient;
+  private final String cleanInstantTime;
+  private final transient ExecutorService executor = 
Executors.newSingleThreadExecutor();
+
+  protected AsyncCleanerService(HoodieWriteClient<?> writeClient, String 
cleanInstantTime) {
+    this.writeClient = writeClient;
+    this.cleanInstantTime = cleanInstantTime;
+  }
+
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {
+    return Pair.of(CompletableFuture.supplyAsync(() -> {
+      writeClient.clean(cleanInstantTime);
+      return true;
+    }), executor);
+  }
+
+  public static AsyncCleanerService 
startAsyncCleaningIfEnabled(HoodieWriteClient writeClient,
+                                                                String 
instantTime) {
+    AsyncCleanerService asyncCleanerService = null;
+    if (writeClient.getConfig().isAutoClean() && 
writeClient.getConfig().isAsyncClean()) {
+      LOG.info("Auto cleaning is enabled. Running cleaner async to write 
operation");
+      asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
+      asyncCleanerService.start(null);
+    } else {
+      LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+    }
+    return asyncCleanerService;
+  }
+
+  public static void waitForCompletion(AsyncCleanerService 
asyncCleanerService) {
+    if (asyncCleanerService != null) {
+      LOG.info("Waiting for async cleaner to finish");
+      try {
+        asyncCleanerService.waitForShutdown();
+      } catch (Exception e) {
+        throw new HoodieException("Error waiting for async cleaning to 
finish", e);
+      }
+    }
+  }
+
+  public static void forceShutdown(AsyncCleanerService asyncCleanerService) {
+    if (asyncCleanerService != null) {
+      LOG.info("Shutting down async cleaner");
+      asyncCleanerService.shutdown(true);
+    }
+  }
+}
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 8f562ea..a6d9d0d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -80,6 +80,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> 
extends AbstractHo
   private final boolean rollbackPending;
   private final transient HoodieMetrics metrics;
   private transient Timer.Context compactionTimer;
+  private transient AsyncCleanerService asyncCleanerService;
 
   /**
    * Create a write client, without cleaning up failed/inflight commits.
@@ -95,28 +96,28 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * Create a write client, with new hudi index.
    *
    * @param jsc Java Spark Context
-   * @param clientConfig instance of HoodieWriteConfig
+   * @param writeConfig instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig, boolean rollbackPending) {
-    this(jsc, clientConfig, rollbackPending, 
HoodieIndex.createIndex(clientConfig));
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending) {
+    this(jsc, writeConfig, rollbackPending, 
HoodieIndex.createIndex(writeConfig));
   }
 
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig, boolean rollbackPending, HoodieIndex index) {
-    this(jsc, clientConfig, rollbackPending, index, Option.empty());
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending, HoodieIndex index) {
+    this(jsc, writeConfig, rollbackPending, index, Option.empty());
   }
 
   /**
    *  Create a write client, allows to specify all parameters.
    *
    * @param jsc Java Spark Context
-   * @param clientConfig instance of HoodieWriteConfig
+   * @param writeConfig instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    * @param timelineService Timeline Service that runs as part of write client.
    */
-  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig, boolean rollbackPending,
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
writeConfig, boolean rollbackPending,
       HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, index, clientConfig, timelineService);
+    super(jsc, index, writeConfig, timelineService);
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
   }
@@ -158,6 +159,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
     HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
@@ -178,6 +180,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = 
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT_PREPPED);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
     HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, 
preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -196,6 +199,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
     HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
     return postWrite(result, instantTime, table);
   }
@@ -215,6 +219,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = 
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_PREPPED);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
     HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, 
preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -254,6 +259,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
     HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, 
bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -279,6 +285,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = 
getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
     HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, 
preppedRecords, bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -338,15 +345,27 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
       // We cannot have unbounded commit files. Archive commits if we have to 
archive
       HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, createMetaClient(true));
       archiveLog.archiveIfRequired(hadoopConf);
-      if (config.isAutoClean()) {
-        // Call clean to cleanup if there is anything to cleanup after the 
commit,
+      autoCleanOnCommit(instantTime);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Handle auto clean during commit.
+   * @param instantTime
+   */
+  private void autoCleanOnCommit(String instantTime) {
+    if (config.isAutoClean()) {
+      // Call clean to cleanup if there is anything to cleanup after the 
commit,
+      if (config.isAsyncClean()) {
+        LOG.info("Cleaner has been spawned already. Waiting for it to finish");
+        AsyncCleanerService.waitForCompletion(asyncCleanerService);
+        LOG.info("Cleaner has finished");
+      } else {
         LOG.info("Auto cleaning is enabled. Running cleaner now");
         clean(instantTime);
-      } else {
-        LOG.info("Auto cleaning is not enabled. Not running cleaner now");
       }
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
     }
   }
 
@@ -477,7 +496,8 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   @Override
   public void close() {
-    // Stop timeline-server if running
+    AsyncCleanerService.forceShutdown(asyncCleanerService);
+    asyncCleanerService = null;
     super.close();
   }
 
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 4ec0485..1b993b1 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -40,6 +40,8 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
 
   public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
   public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
+  public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
+
   // Turn on inline compaction - after fw delta commits a inline compaction 
will be run
   public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
   // Run a compaction every N delta commits
@@ -101,6 +103,7 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
   public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = 
"false";
   private static final String DEFAULT_CLEANER_POLICY = 
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
   private static final String DEFAULT_AUTO_CLEAN = "true";
+  private static final String DEFAULT_ASYNC_CLEAN = "false";
   private static final String DEFAULT_INLINE_COMPACT = "false";
   private static final String DEFAULT_INCREMENTAL_CLEANER = "true";
   private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5";
@@ -143,6 +146,11 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withAsyncClean(Boolean asyncClean) {
+      props.setProperty(ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
+      return this;
+    }
+
     public Builder withIncrementalCleaningMode(Boolean 
incrementalCleaningMode) {
       props.setProperty(CLEANER_INCREMENTAL_MODE, 
String.valueOf(incrementalCleaningMode));
       return this;
@@ -247,6 +255,8 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
     public HoodieCompactionConfig build() {
       HoodieCompactionConfig config = new HoodieCompactionConfig(props);
       setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), 
AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
+      setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN_PROP), 
ASYNC_CLEAN_PROP,
+              DEFAULT_ASYNC_CLEAN);
       setDefaultOnCondition(props, 
!props.containsKey(CLEANER_INCREMENTAL_MODE), CLEANER_INCREMENTAL_MODE,
           DEFAULT_INCREMENTAL_CLEANER);
       setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), 
INLINE_COMPACT_PROP,
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 083d780..3b822f0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -296,6 +296,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
   }
 
+  public boolean isAsyncClean() {
+    return 
Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.ASYNC_CLEAN_PROP));
+  }
+
   public boolean incrementalCleanerModeEnabled() {
     return 
Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE));
   }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index b3caa44..57feebc 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -185,8 +185,9 @@ public class CleanActionExecutor extends 
BaseActionExecutor<HoodieCleanMetadata>
   Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
     final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
     if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
-        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
-
+        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()
+        && 
cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
 > 0) {
+      // Only create cleaner plan which does some work
       final HoodieInstant cleanInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, 
startCleanTime);
       // Save to both aux and timeline folder
       try {
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 82f911d..541f84f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -51,6 +51,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.testutils.HoodieTestDataGenerator;
@@ -83,6 +84,7 @@ import scala.Tuple3;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -128,15 +130,8 @@ public class TestCleaner extends HoodieClientTestBase {
     HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), 
hadoopConf);
 
     assertFalse(table.getCompletedCommitsTimeline().empty());
-    if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
-      // We no longer write empty cleaner plans when there are not enough 
commits present
-      assertTrue(table.getCompletedCleanTimeline().empty());
-    } else {
-      String instantTime = 
table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
-      assertFalse(table.getCompletedCleanTimeline().empty());
-      assertEquals(instantTime, 
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
-          "The clean instant should be the same as the commit instant");
-    }
+    // We no longer write empty cleaner plans when there is nothing to be 
cleaned.
+    assertTrue(table.getCompletedCleanTimeline().empty());
 
     HoodieIndex index = HoodieIndex.createIndex(cfg);
     List<HoodieRecord> taggedRecords = 
index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
@@ -439,17 +434,32 @@ public class TestCleaner extends HoodieClientTestBase {
 
     if (simulateRetryFailure) {
       HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.CLEAN_ACTION, cleanInstantTs);
+      HoodieCleanMetadata metadata = 
CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
+      metadata.getPartitionMetadata().values().forEach(p -> {
+        String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
+        p.getSuccessDeleteFiles().forEach(p2 -> {
+          try {
+            metaClient.getFs().create(new Path(dirPath, p2), true);
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+        });
+      });
       
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
-      HoodieCleanMetadata cleanMetadata2 = writeClient.clean(getNextInstant());
+      HoodieCleanMetadata newCleanMetadata = 
writeClient.clean(getNextInstant());
+      // No new clean metadata would be created. Only the previous one will be 
retried
+      assertNull(newCleanMetadata);
+      HoodieCleanMetadata cleanMetadata2 = 
CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
       assertEquals(cleanMetadata1.getEarliestCommitToRetain(), 
cleanMetadata2.getEarliestCommitToRetain());
-      assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted());
+      assertEquals(cleanMetadata1.getTotalFilesDeleted(), 
cleanMetadata2.getTotalFilesDeleted());
       assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), 
cleanMetadata2.getPartitionMetadata().keySet());
       final HoodieCleanMetadata retriedCleanMetadata = 
CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), 
completedCleanInstant);
       cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> {
         HoodieCleanPartitionMetadata p1 = 
cleanMetadata1.getPartitionMetadata().get(k);
         HoodieCleanPartitionMetadata p2 = 
retriedCleanMetadata.getPartitionMetadata().get(k);
         assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
-        assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles());
+        assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles());
+        assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles());
         assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
         assertEquals(k, p1.getPartitionPath());
       });
@@ -487,12 +497,7 @@ public class TestCleaner extends HoodieClientTestBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsOne, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean any files");
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsOne, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean any files");
+    assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
         file1P0C0));
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
@@ -548,9 +553,7 @@ public class TestCleaner extends HoodieClientTestBase {
     // No cleaning on partially written file, with no commit.
     HoodieTestUtils.createDataFile(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // 
update
     List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsFour, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean any files");
+    assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
         file3P0C2));
   }
@@ -819,11 +822,8 @@ public class TestCleaner extends HoodieClientTestBase {
         
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
 
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 
simulateFailureRetry);
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsThree, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
-            .getSuccessDeleteFiles().size(),
+    assertEquals(0, hoodieCleanStatsThree.size(),
         "Must not clean any file. We have to keep 1 version before the latest 
commit time to keep");
-
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
         file1P0C0));
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a3d81fa..ccd5c49 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.async.AbstractAsyncService;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -326,7 +327,7 @@ public class HoodieDeltaStreamer implements Serializable {
   /**
    * Syncs data either in single-run or in continuous mode.
    */
-  public static class DeltaSyncService extends AbstractDeltaStreamerService {
+  public static class DeltaSyncService extends AbstractAsyncService {
 
     private static final long serialVersionUID = 1L;
     /**
@@ -532,7 +533,7 @@ public class HoodieDeltaStreamer implements Serializable {
   /**
    * Async Compactor Service that runs in separate thread. Currently, only one 
compactor is allowed to run at any time.
    */
-  public static class AsyncCompactService extends AbstractDeltaStreamerService 
{
+  public static class AsyncCompactService extends AbstractAsyncService {
 
     private static final long serialVersionUID = 1L;
     private final int maxConcurrentCompaction;

Reply via email to