This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c4f2fd  [HUDI-1984] Support independent flink hudi compaction 
function (#3046)
0c4f2fd is described below

commit 0c4f2fdc15982870b59d3458303973af95984266
Author: swuferhong <337361...@qq.com>
AuthorDate: Sun Jun 13 15:04:46 2021 +0800

    [HUDI-1984] Support independent flink hudi compaction function (#3046)
---
 .../FlinkScheduleCompactionActionExecutor.java     |  44 +++++++
 .../apache/hudi/configuration/FlinkOptions.java    |  35 -----
 .../java/org/apache/hudi/sink/CleanFunction.java   |   2 +-
 .../apache/hudi/sink/compact/CompactFunction.java  |  29 ++--
 .../hudi/sink/compact/CompactionCommitSink.java    |  18 ++-
 .../sink/compact/CompactionPlanSourceFunction.java | 111 ++++++++++++++++
 .../hudi/sink/compact/FlinkCompactionConfig.java   | 107 +++++++++++++++
 .../hudi/sink/compact/HoodieFlinkCompactor.java    | 146 +++++++++++++++++++++
 ...tFunction.java => NonKeyedCompactFunction.java} |  44 +++----
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |  37 +++++-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |   6 +-
 .../java/org/apache/hudi/util/CompactionUtil.java  |  86 ++++++++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    |  12 +-
 .../org/apache/hudi/sink/StreamWriteITCase.java    | 100 +++++++++++++-
 14 files changed, 692 insertions(+), 85 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java
index 7db2dc8..d18cac2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java
@@ -28,12 +28,16 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -50,12 +54,15 @@ public class FlinkScheduleCompactionActionExecutor<T 
extends HoodieRecordPayload
 
   private static final Logger LOG = 
LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class);
 
+  private final Option<Map<String, String>> extraMetadata;
+
   public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context,
                                                HoodieWriteConfig config,
                                                HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
                                                String instantTime,
                                                Option<Map<String, String>> 
extraMetadata) {
     super(context, config, table, instantTime, extraMetadata);
+    this.extraMetadata = extraMetadata;
   }
 
   @Override
@@ -149,4 +156,41 @@ public class FlinkScheduleCompactionActionExecutor<T 
extends HoodieRecordPayload
     }
     return timestamp;
   }
+
+  @Override
+  public Option<HoodieCompactionPlan> execute() {
+    if 
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+            && !config.getFailedWritesCleanPolicy().isLazy()) {
+      // if there are inflight writes, their instantTime must not be less than 
that of compaction instant time
+      
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
+              .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
+                      
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), 
HoodieTimeline.GREATER_THAN, instantTime),
+                      "Earliest write inflight instant time must be later than 
compaction time. Earliest :" + earliestInflight
+                              + ", Compaction scheduled at " + instantTime));
+      // Committed and pending compaction instants should have strictly lower 
timestamps
+      List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
+              
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
+              .filter(instant -> HoodieTimeline.compareTimestamps(
+                      instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
+              .collect(Collectors.toList());
+      ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
+              "Following instants have timestamps >= compactionInstant (" + 
instantTime + ") Instants :"
+                      + conflictingInstants);
+    }
+
+    HoodieCompactionPlan plan = scheduleCompaction();
+    if (plan != null && (plan.getOperations() != null) && 
(!plan.getOperations().isEmpty())) {
+      extraMetadata.ifPresent(plan::setExtraMetadata);
+      HoodieInstant compactionInstant =
+              new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, instantTime);
+      try {
+        table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
+                TimelineMetadataUtils.serializeCompactionPlan(plan));
+      } catch (IOException ioe) {
+        throw new HoodieIOException("Exception scheduling compaction", ioe);
+      }
+      return Option.of(plan);
+    }
+    return Option.empty();
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index d407697..b14ef6e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -25,8 +25,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
-import org.apache.hudi.streamer.FlinkStreamerConfig;
-import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -521,39 +519,6 @@ public class FlinkOptions {
   private static final String PROPERTIES_PREFIX = "properties.";
 
   /**
-   * Transforms a {@code HoodieFlinkStreamer.Config} into {@code 
Configuration}.
-   * The latter is more suitable for the table APIs. It reads all the 
properties
-   * in the properties file (set by `--props` option) and cmd line options
-   * (set by `--hoodie-conf` option).
-   */
-  @SuppressWarnings("unchecked, rawtypes")
-  public static org.apache.flink.configuration.Configuration 
fromStreamerConfig(FlinkStreamerConfig config) {
-    Map<String, String> propsMap = new HashMap<String, String>((Map) 
StreamerUtil.getProps(config));
-    org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
-
-    conf.setString(FlinkOptions.PATH, config.targetBasePath);
-    conf.setString(READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath);
-    conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
-    // copy_on_write works same as COPY_ON_WRITE
-    conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
-    conf.setString(FlinkOptions.OPERATION, config.operation.value());
-    conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
-    conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
-    conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
-    conf.setInteger(FlinkOptions.RETRY_TIMES, 
Integer.parseInt(config.instantRetryTimes));
-    conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, 
Long.parseLong(config.instantRetryInterval));
-    conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
-    conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
-    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
config.partitionPathField);
-    // keygenClass has higher priority than keygenType
-    conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
-    conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
-    conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
-
-    return conf;
-  }
-
-  /**
    * Collects the config options that start with 'properties.' into a 
'key'='value' list.
    */
   public static Map<String, String> getHoodieProperties(Map<String, String> 
options) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 14dd827..1ca593f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -46,7 +46,7 @@ public class CleanFunction<T> extends AbstractRichFunction
 
   private final Configuration conf;
 
-  private HoodieFlinkWriteClient writeClient;
+  protected HoodieFlinkWriteClient writeClient;
   private NonThrownExecutor executor;
 
   private volatile boolean isCleaning;
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index ee8678b..13a9f45 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -33,6 +33,7 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -79,22 +80,24 @@ public class CompactFunction extends 
KeyedProcessFunction<Long, CompactionPlanEv
     final CompactionOperation compactionOperation = event.getOperation();
     // executes the compaction task asynchronously to not block the checkpoint 
barrier propagate.
     executor.execute(
-        () -> {
-          HoodieFlinkMergeOnReadTableCompactor compactor = new 
HoodieFlinkMergeOnReadTableCompactor();
-          List<WriteStatus> writeStatuses = compactor.compact(
-              new HoodieFlinkCopyOnWriteTable<>(
-                  this.writeClient.getConfig(),
-                  this.writeClient.getEngineContext(),
-                  this.writeClient.getHoodieTable().getMetaClient()),
-              this.writeClient.getHoodieTable().getMetaClient(),
-              this.writeClient.getConfig(),
-              compactionOperation,
-              instantTime);
-          collector.collect(new CompactionCommitEvent(instantTime, 
writeStatuses, taskID));
-        }, "Execute compaction for instant %s from task %d", instantTime, 
taskID
+        () -> doCompaction(instantTime, compactionOperation, collector), 
"Execute compaction for instant %s from task %d", instantTime, taskID
     );
   }
 
+  private void doCompaction(String instantTime, CompactionOperation 
compactionOperation, Collector<CompactionCommitEvent> collector) throws 
IOException {
+    HoodieFlinkMergeOnReadTableCompactor compactor = new 
HoodieFlinkMergeOnReadTableCompactor();
+    List<WriteStatus> writeStatuses = compactor.compact(
+        new HoodieFlinkCopyOnWriteTable<>(
+            this.writeClient.getConfig(),
+            this.writeClient.getEngineContext(),
+            this.writeClient.getHoodieTable().getMetaClient()),
+        this.writeClient.getHoodieTable().getMetaClient(),
+        this.writeClient.getConfig(),
+        compactionOperation,
+        instantTime);
+    collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, 
taskID));
+  }
+
   @VisibleForTesting
   public void setExecutor(NonThrownExecutor executor) {
     this.executor = executor;
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 41831cd..0884342 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -19,12 +19,12 @@
 package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -60,11 +60,6 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
   private final Configuration conf;
 
   /**
-   * Write Client.
-   */
-  private transient HoodieFlinkWriteClient writeClient;
-
-  /**
    * Buffer to collect the event from each compact task {@code 
CompactFunction}.
    * The key is the instant time.
    */
@@ -78,7 +73,9 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
   @Override
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
-    this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    if (writeClient == null) {
+      this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    }
     this.commitBuffer = new HashMap<>();
   }
 
@@ -122,6 +119,13 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
     }
     // commit the compaction
     this.writeClient.commitCompaction(instant, statuses, Option.empty());
+
+    // Whether to cleanup the old log file when compaction
+    if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+      this.writeClient.startAsyncCleaning();
+      this.writeClient.waitForCleaningFinish();
+    }
+
     // reset the status
     reset(instant);
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
new file mode 100644
index 0000000..5d5a008
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.table.HoodieFlinkTable;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Flink hudi compaction source function.
+ *
+ * <P>This function read the compaction plan as {@link CompactionOperation}s 
then assign the compaction task
+ * event {@link CompactionPlanEvent} to downstream operators.
+ *
+ * <p>The compaction instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ *   <li>If the timeline has no inflight instants,
+ *   use {@link 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ *   as the instant time;</li>
+ *   <li>If the timeline has inflight instants,
+ *   use the {earliest inflight instant time - 1ms} as the instant time.</li>
+ * </ul>
+ */
+public class CompactionPlanSourceFunction extends AbstractRichFunction 
implements SourceFunction<CompactionPlanEvent> {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(CompactionPlanSourceFunction.class);
+
+  /**
+   * Compaction instant time.
+   */
+  private String compactionInstantTime;
+
+  /**
+   * Hoodie flink table.
+   */
+  private HoodieFlinkTable<?> table;
+
+  /**
+   * The compaction plan.
+   */
+  private HoodieCompactionPlan compactionPlan;
+
+  /**
+   * Hoodie instant.
+   */
+  private HoodieInstant instant;
+
+  public CompactionPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant 
instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
+    this.table = table;
+    this.instant = instant;
+    this.compactionPlan = compactionPlan;
+    this.compactionInstantTime = compactionInstantTime;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void run(SourceContext sourceContext) throws Exception {
+    // Mark instant as compaction inflight
+    table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+    table.getMetaClient().reloadActiveTimeline();
+
+    List<CompactionOperation> operations = 
this.compactionPlan.getOperations().stream()
+        
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+    LOG.info("CompactionPlanFunction compacting " + operations + " files");
+    for (CompactionOperation operation : operations) {
+      sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, 
operation));
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void cancel() {
+    // no operation
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
new file mode 100644
index 0000000..8e0c671
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink compaction.
+ */
+public class FlinkCompactionConfig extends Configuration {
+
+  @Parameter(names = {"--help", "-h"}, help = true)
+  public Boolean help = false;
+
+  // ------------------------------------------------------------------------
+  //  Hudi Write Options
+  // ------------------------------------------------------------------------
+
+  @Parameter(names = {"--path"}, description = "Base path for the target 
hoodie table.", required = true)
+  public String path;
+
+  // ------------------------------------------------------------------------
+  //  Compaction Options
+  // ------------------------------------------------------------------------
+
+  public static final String NUM_COMMITS = "num_commits";
+  public static final String TIME_ELAPSED = "time_elapsed";
+  public static final String NUM_AND_TIME = "num_and_time";
+  public static final String NUM_OR_TIME = "num_or_time";
+  @Parameter(names = {"--compaction-trigger-strategy"},
+      description = "Strategy to trigger compaction, options are 
'num_commits': trigger compaction when reach N delta commits;\n"
+          + "'time_elapsed': trigger compaction when time elapsed > N seconds 
since last compaction;\n"
+          + "'num_and_time': trigger compaction when both NUM_COMMITS and 
TIME_ELAPSED are satisfied;\n"
+          + "'num_or_time': trigger compaction when NUM_COMMITS or 
TIME_ELAPSED is satisfied.\n"
+          + "Default is 'num_commits'",
+      required = false)
+  public String compactionTriggerStrategy = NUM_COMMITS;
+
+  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta 
commits needed to trigger compaction, default 5 commits", required = false)
+  public Integer compactionDeltaCommits = 1;
+
+  @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta 
seconds time needed to trigger compaction, default 1 hour", required = false)
+  public Integer compactionDeltaSeconds = 3600;
+
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to 
cleanup the old commits immediately on new commits, enabled by default", 
required = false)
+  public Boolean cleanAsyncEnable =  false;
+
+  @Parameter(names = {"--clean-retain-commits"},
+      description = "Number of commits to retain. So data will be retained for 
num_of_commits * time_between_commits (scheduled).\n"
+          + "This also directly translates into how much you can incrementally 
pull on this table, default 10",
+      required = false)
+  public Integer cleanRetainCommits = 10;
+
+  @Parameter(names = {"--archive-min-commits"},
+      description = "Min number of commits to keep before archiving older 
commits into a sequential log, default 20.",
+      required = false)
+  public Integer archiveMinCommits = 20;
+
+  @Parameter(names = {"--archive-max-commits"},
+      description = "Max number of commits to keep before archiving older 
commits into a sequential log, default 30.",
+      required = false)
+  public Integer archiveMaxCommits = 30;
+
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in 
MB for compaction spillable map, default 100MB.", required = false)
+  public Integer compactionMaxMemory = 100;
+
+  /**
+   * Transforms a {@code HoodieFlinkCompaction.config} into {@code 
Configuration}.
+   * The latter is more suitable for the table APIs. It reads all the 
properties
+   * in the properties file (set by `--props` option) and cmd line options
+   *  (set by `--hoodie-conf` option).
+   * */
+  public static org.apache.flink.configuration.Configuration 
toFlinkConfig(FlinkCompactionConfig config) {
+    org.apache.flink.configuration.Configuration conf = new Configuration();
+
+    conf.setString(FlinkOptions.PATH, config.path);
+    conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, 
config.compactionTriggerStrategy);
+    conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 
config.archiveMaxCommits);
+    conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 
config.archiveMinCommits);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 
config.cleanRetainCommits);
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 
config.compactionDeltaCommits);
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, 
config.compactionDeltaSeconds);
+    conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 
config.compactionMaxMemory);
+    conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
+
+    return conf;
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
new file mode 100644
index 0000000..57b5c06
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Flink hudi compaction program that can be executed manually.
+ */
+public class HoodieFlinkCompactor {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkCompactor.class);
+
+  public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
+
+    // get the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // to compute the compaction instant time and do compaction.
+    String instantTime = CompactionUtil.getCompactionInstantTime(metaClient);
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+    // the last instant takes the highest priority.
+    Option<HoodieInstant> compactionInstant = 
table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
+    String compactionInstantTime = compactionInstant.isPresent() ? 
compactionInstant.get().getTimestamp() : null;
+    if (compactionInstantTime == null) {
+      // do nothing.
+      LOG.info("No compaction plan for this job ");
+      return;
+    }
+    // generate compaction plan
+    // should support configurable commit metadata
+    HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+        table.getMetaClient(), compactionInstantTime);
+
+    if (compactionPlan == null || (compactionPlan.getOperations() == null)
+        || (compactionPlan.getOperations().isEmpty())) {
+      // No compaction plan, do nothing and return.
+      LOG.info("No compaction plan for this job and instant " + 
compactionInstantTime);
+      return;
+    }
+
+    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
+    if (!pendingCompactionTimeline.containsInstant(instant)) {
+      // this means that the compaction plan was written to auxiliary 
path(.tmp)
+      // but not the meta path(.hoodie), this usually happens when the job 
crush
+      // exceptionally.
+
+      // clean the compaction plan in auxiliary path and cancels the 
compaction.
+
+      LOG.warn("The compaction plan was fetched through the auxiliary 
path(.tmp) but not the meta path(.hoodie).\n"
+          + "Clean the compaction plan in auxiliary path and cancels the 
compaction");
+      cleanInstant(table.getMetaClient(), instant);
+      return;
+    }
+
+    env.addSource(new CompactionPlanSourceFunction(table, instant, 
compactionPlan, compactionInstantTime))
+        .name("compaction_source")
+        .uid("uid_compaction_source")
+        .rebalance()
+        .transform("compact_task",
+            TypeInformation.of(CompactionCommitEvent.class),
+            new ProcessOperator<>(new NonKeyedCompactFunction(conf)))
+        .setParallelism(compactionPlan.getOperations().size())
+        .addSink(new CompactionCommitSink(conf))
+        .name("clean_commits")
+        .uid("uid_clean_commits")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_compaction");
+  }
+
+  private static void cleanInstant(HoodieTableMetaClient metaClient, 
HoodieInstant instant) {
+    Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), 
instant.getFileName());
+    try {
+      if (metaClient.getFs().exists(commitFilePath)) {
+        boolean deleted = metaClient.getFs().delete(commitFilePath, false);
+        if (deleted) {
+          LOG.info("Removed instant " + instant);
+        } else {
+          throw new HoodieIOException("Could not delete instant " + instant);
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not remove requested commit " + 
commitFilePath, e);
+    }
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java
similarity index 64%
copy from 
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
copy to 
hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java
index ee8678b..f1be78c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java
@@ -28,19 +28,20 @@ import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
  * Function to execute the actual compaction task assigned by the compaction 
plan task.
- * In order to execute scalable, the input should shuffle by the compact event 
{@link CompactionPlanEvent}.
+ * The input compact event {@link CompactionPlanEvent}s were distributed 
evenly to this function.
  */
-public class CompactFunction extends KeyedProcessFunction<Long, 
CompactionPlanEvent, CompactionCommitEvent> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(CompactFunction.class);
+public class NonKeyedCompactFunction extends 
ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NonKeyedCompactFunction.class);
 
   /**
    * Config options.
@@ -62,7 +63,7 @@ public class CompactFunction extends 
KeyedProcessFunction<Long, CompactionPlanEv
    */
   private transient NonThrownExecutor executor;
 
-  public CompactFunction(Configuration conf) {
+  public NonKeyedCompactFunction(Configuration conf) {
     this.conf = conf;
   }
 
@@ -74,25 +75,24 @@ public class CompactFunction extends 
KeyedProcessFunction<Long, CompactionPlanEv
   }
 
   @Override
-  public void processElement(CompactionPlanEvent event, Context context, 
Collector<CompactionCommitEvent> collector) throws Exception {
+  public void processElement(CompactionPlanEvent event, Context ctx, 
Collector<CompactionCommitEvent> collector) throws Exception {
     final String instantTime = event.getCompactionInstantTime();
     final CompactionOperation compactionOperation = event.getOperation();
-    // executes the compaction task asynchronously to not block the checkpoint 
barrier propagate.
-    executor.execute(
-        () -> {
-          HoodieFlinkMergeOnReadTableCompactor compactor = new 
HoodieFlinkMergeOnReadTableCompactor();
-          List<WriteStatus> writeStatuses = compactor.compact(
-              new HoodieFlinkCopyOnWriteTable<>(
-                  this.writeClient.getConfig(),
-                  this.writeClient.getEngineContext(),
-                  this.writeClient.getHoodieTable().getMetaClient()),
-              this.writeClient.getHoodieTable().getMetaClient(),
-              this.writeClient.getConfig(),
-              compactionOperation,
-              instantTime);
-          collector.collect(new CompactionCommitEvent(instantTime, 
writeStatuses, taskID));
-        }, "Execute compaction for instant %s from task %d", instantTime, 
taskID
-    );
+    doCompaction(instantTime, compactionOperation, collector);
+  }
+
+  private void doCompaction(String instantTime, CompactionOperation 
compactionOperation, Collector<CompactionCommitEvent> collector) throws 
IOException {
+    HoodieFlinkMergeOnReadTableCompactor compactor = new 
HoodieFlinkMergeOnReadTableCompactor();
+    List<WriteStatus> writeStatuses = compactor.compact(
+        new HoodieFlinkCopyOnWriteTable<>(
+            this.writeClient.getConfig(),
+            this.writeClient.getEngineContext(),
+            this.writeClient.getHoodieTable().getMetaClient()),
+        this.writeClient.getHoodieTable().getMetaClient(),
+        this.writeClient.getConfig(),
+        compactionOperation,
+        instantTime);
+    collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, 
taskID));
   }
 
   @VisibleForTesting
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 3b43a0d..843e9bf 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -21,13 +21,17 @@ package org.apache.hudi.streamer;
 import org.apache.hudi.client.utils.OperationConverter;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.util.StreamerUtil;
 
 import com.beust.jcommander.Parameter;
 import org.apache.flink.configuration.Configuration;
-import org.apache.hudi.keygen.constant.KeyGeneratorType;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Configurations for Hoodie Flink streamer.
@@ -124,4 +128,35 @@ public class FlinkStreamerConfig extends Configuration {
 
   @Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks 
that do actual write, default is 4.")
   public Integer writeTaskNum = 4;
+
+  /**
+   * Transforms a {@code HoodieFlinkStreamer.Config} into {@code 
Configuration}.
+   * The latter is more suitable for the table APIs. It reads all the 
properties
+   * in the properties file (set by `--props` option) and cmd line options
+   * (set by `--hoodie-conf` option).
+   */
+  @SuppressWarnings("unchecked, rawtypes")
+  public static org.apache.flink.configuration.Configuration 
toFlinkConfig(FlinkStreamerConfig config) {
+    Map<String, String> propsMap = new HashMap<String, String>((Map) 
StreamerUtil.getProps(config));
+    org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
+
+    conf.setString(FlinkOptions.PATH, config.targetBasePath);
+    conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, 
config.readSchemaFilePath);
+    conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
+    // copy_on_write works same as COPY_ON_WRITE
+    conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
+    conf.setString(FlinkOptions.OPERATION, config.operation.value());
+    conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
+    conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
+    conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
+    conf.setInteger(FlinkOptions.RETRY_TIMES, 
Integer.parseInt(config.instantRetryTimes));
+    conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, 
Long.parseLong(config.instantRetryInterval));
+    conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
config.partitionPathField);
+    conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
+    conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
+
+    return conf;
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 9e77e73..eaca4c9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -23,11 +23,11 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperatorFactory;
 import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.compact.CompactFunction;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactFunction;
+import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -81,7 +81,7 @@ public class HoodieFlinkStreamer {
     RowType rowType =
         (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
             .getLogicalType();
-    Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
+    Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
     int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
         new StreamWriteOperatorFactory<>(conf);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
new file mode 100644
index 0000000..5ffd58d
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for flink hudi compaction.
+ */
+public class CompactionUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionUtil.class);
+
+  /**
+   * Creates the metaClient.
+   * */
+  public static HoodieTableMetaClient createMetaClient(Configuration conf) {
+    return 
HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
+  }
+
+  /**
+   * Gets compaction Instant time.
+   * */
+  public static String getCompactionInstantTime(HoodieTableMetaClient 
metaClient) {
+    Option<HoodieInstant> hoodieInstantOption = 
metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant();
+    if (hoodieInstantOption.isPresent()) {
+      HoodieInstant firstInstant = hoodieInstantOption.get();
+      String newCommitTime = 
StreamerUtil.instantTimeSubtract(firstInstant.getTimestamp(), 1);
+      // Committed and pending compaction instants should have strictly lower 
timestamps
+      List<HoodieInstant> conflictingInstants = metaClient.getActiveTimeline()
+              
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
+              .filter(instant -> HoodieTimeline.compareTimestamps(
+                      instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS, newCommitTime))
+              .collect(Collectors.toList());
+      ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
+              "Following instants have timestamps >= compactionInstant (" + 
newCommitTime + ") Instants :"
+                      + conflictingInstants);
+      return newCommitTime;
+    } else {
+      return HoodieActiveTimeline.createNewInstantTime();
+    }
+  }
+
+  /**
+   * Sets up the avro schema string into the give configuration {@code conf}
+   * through reading from the hoodie table metadata.
+   *
+   * @param conf    The configuration
+   */
+  public static void setAvroSchema(Configuration conf, HoodieTableMetaClient 
metaClient) throws Exception {
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+    conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString());
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b9b6ce9..caecba4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -90,7 +90,7 @@ public class StreamerUtil {
   }
 
   public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
-    return new 
FilebasedSchemaProvider(FlinkOptions.fromStreamerConfig(cfg)).getSourceSchema();
+    return new 
FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema();
   }
 
   public static Schema 
getSourceSchema(org.apache.flink.configuration.Configuration conf) {
@@ -150,7 +150,7 @@ public class StreamerUtil {
   }
 
   public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig 
conf) {
-    return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
+    return getHoodieClientConfig(FlinkStreamerConfig.toFlinkConfig(conf));
   }
 
   public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
@@ -302,4 +302,12 @@ public class StreamerUtil {
     long oldTime = Long.parseLong(oldInstant);
     return String.valueOf(oldTime + milliseconds);
   }
+
+  /**
+   * Subtract the old instant time with given milliseconds and returns.
+   * */
+  public static String instantTimeSubtract(String oldInstant, long 
milliseconds) {
+    long oldTime = Long.parseLong(oldInstant);
+    return String.valueOf(oldTime - milliseconds);
+  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index ad9c9dc..b001c54 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -18,8 +18,15 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.bootstrap.BootstrapFunction;
 import org.apache.hudi.sink.compact.CompactFunction;
@@ -27,10 +34,15 @@ import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
 import org.apache.hudi.sink.compact.CompactionPlanEvent;
 import org.apache.hudi.sink.compact.CompactionPlanOperator;
+import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.sink.compact.NonKeyedCompactFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignOperator;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
+import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -50,13 +62,20 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.TestLogger;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -72,6 +91,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class StreamWriteITCase extends TestLogger {
 
+  protected static final Logger LOG = 
LoggerFactory.getLogger(StreamWriteITCase.class);
+
   private static final Map<String, List<String>> EXPECTED = new HashMap<>();
 
   static {
@@ -148,6 +169,83 @@ public class StreamWriteITCase extends TestLogger {
   }
 
   @Test
+  public void testHoodieFlinkCompactor() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+            
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+            + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+            + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+            + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+            + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+            + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+            + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+            + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+            + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+    TableResult tableResult = tableEnv.executeSql(insertInto);
+    TimeUnit.SECONDS.sleep(5);
+    tableResult.await();
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition");
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the compaction instant time and do compaction.
+    String instantTime = CompactionUtil.getCompactionInstantTime(metaClient);
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+    // the last instant takes the highest priority.
+    Option<HoodieInstant> compactionInstant = 
table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
+    String compactionInstantTime = compactionInstant.get().getTimestamp();
+
+    // generate compaction plan
+    // should support configurable commit metadata
+    HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+            table.getMetaClient(), compactionInstantTime);
+
+    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+
+    env.addSource(new CompactionPlanSourceFunction(table, instant, 
compactionPlan, compactionInstantTime))
+            .name("compaction_source")
+            .uid("uid_compaction_source")
+            .rebalance()
+            .transform("compact_task",
+                    TypeInformation.of(CompactionCommitEvent.class),
+                    new ProcessOperator<>(new NonKeyedCompactFunction(conf)))
+            .setParallelism(compactionPlan.getOperations().size())
+            .addSink(new CompactionCommitSink(conf))
+            .name("clean_commits")
+            .uid("uid_clean_commits")
+            .setParallelism(1);
+
+    env.execute("flink_hudi_compaction");
+    TestData.checkWrittenFullData(tempFile, EXPECTED);
+  }
+
+  @Test
   public void testMergeOnReadWriteWithCompaction() throws Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);

Reply via email to