This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cc33f53c794 [HUDI-6366] Prevent flink offline table service rerun 
completed instant (#8950)
cc33f53c794 is described below

commit cc33f53c794a3295044daacaaed900c77274bbba
Author: Bingeng Huang <[email protected]>
AuthorDate: Fri Jun 16 00:16:10 2023 +0800

    [HUDI-6366] Prevent flink offline table service rerun completed instant 
(#8950)
    
    ---------
    
    Co-authored-by: hbg <[email protected]>
---
 .../clustering/ClusteringPlanSourceFunction.java   |  17 ++-
 .../sink/clustering/HoodieFlinkClusteringJob.java  |   2 +-
 .../sink/compact/CompactionPlanSourceFunction.java |  11 +-
 .../hudi/sink/compact/HoodieFlinkCompactor.java    |   2 +-
 .../sink/cluster/ClusteringCommitTestSink.java     |  47 ++++++++
 .../sink/cluster/ITTestHoodieFlinkClustering.java  | 132 ++++++++++++++++++++-
 .../sink/compact/CompactionCommitTestSink.java     |  54 +++++++++
 .../sink/compact/ITTestHoodieFlinkCompactor.java   | 123 ++++++++++++++++++-
 8 files changed, 375 insertions(+), 13 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
index fafaf9a1ce9..ed78e33c10f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.common.model.ClusteringGroupInfo;
 import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
@@ -60,9 +61,12 @@ public class ClusteringPlanSourceFunction extends 
AbstractRichFunction implement
    */
   private final String clusteringInstantTime;
 
-  public ClusteringPlanSourceFunction(String clusteringInstantTime, 
HoodieClusteringPlan clusteringPlan) {
+  private final Configuration conf;
+
+  public ClusteringPlanSourceFunction(String clusteringInstantTime, 
HoodieClusteringPlan clusteringPlan, Configuration conf) {
     this.clusteringInstantTime = clusteringInstantTime;
     this.clusteringPlan = clusteringPlan;
+    this.conf = conf;
   }
 
   @Override
@@ -72,9 +76,14 @@ public class ClusteringPlanSourceFunction extends 
AbstractRichFunction implement
 
   @Override
   public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws 
Exception {
-    for (HoodieClusteringGroup clusteringGroup : 
clusteringPlan.getInputGroups()) {
-      LOG.info("Execute clustering plan for instant {} as {} file slices", 
clusteringInstantTime, clusteringGroup.getSlices().size());
-      sourceContext.collect(new 
ClusteringPlanEvent(this.clusteringInstantTime, 
ClusteringGroupInfo.create(clusteringGroup), 
clusteringPlan.getStrategy().getStrategyParams()));
+    boolean isPending = 
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterPendingReplaceTimeline().containsInstant(clusteringInstantTime);
+    if (isPending) {
+      for (HoodieClusteringGroup clusteringGroup : 
clusteringPlan.getInputGroups()) {
+        LOG.info("Execute clustering plan for instant {} as {} file slices", 
clusteringInstantTime, clusteringGroup.getSlices().size());
+        sourceContext.collect(new 
ClusteringPlanEvent(this.clusteringInstantTime, 
ClusteringGroupInfo.create(clusteringGroup), 
clusteringPlan.getStrategy().getStrategyParams()));
+      }
+    } else {
+      LOG.warn(clusteringInstantTime + " not found in pending clustering 
instants.");
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 223f85defca..a47dfe39586 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -330,7 +330,7 @@ public class HoodieFlinkClusteringJob {
       long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
 
-      DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstant.getTimestamp(), clusteringPlan))
+      DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstant.getTimestamp(), clusteringPlan, 
conf))
           .name("clustering_source")
           .uid("uid_clustering_source")
           .rebalance()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
index 883ba8bd114..6e48c0b386c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
@@ -20,7 +20,9 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
@@ -56,9 +58,11 @@ public class CompactionPlanSourceFunction extends 
AbstractRichFunction implement
    * compaction plan instant -> compaction plan
    */
   private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;
+  private final Configuration conf;
 
-  public CompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> 
compactionPlans) {
+  public CompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> 
compactionPlans, Configuration conf) {
     this.compactionPlans = compactionPlans;
+    this.conf = conf;
   }
 
   @Override
@@ -68,7 +72,12 @@ public class CompactionPlanSourceFunction extends 
AbstractRichFunction implement
 
   @Override
   public void run(SourceContext sourceContext) throws Exception {
+    HoodieTimeline pendingCompactionTimeline = 
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterPendingCompactionTimeline();
     for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
+      if (!pendingCompactionTimeline.containsInstant(pair.getLeft())) {
+        LOG.warn(pair.getLeft() + " not found in pending compaction 
instants.");
+        continue;
+      }
       HoodieCompactionPlan compactionPlan = pair.getRight();
       List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
           
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index e396897dc7e..74fe7929607 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -290,7 +290,7 @@ public class HoodieFlinkCompactor {
       table.getMetaClient().reloadActiveTimeline();
 
       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-      env.addSource(new CompactionPlanSourceFunction(compactionPlans))
+      env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
           .name("compaction_source")
           .uid("uid_compaction_source")
           .rebalance()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ClusteringCommitTestSink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ClusteringCommitTestSink.java
new file mode 100644
index 00000000000..96f8d7423a1
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ClusteringCommitTestSink.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class ClusteringCommitTestSink extends ClusteringCommitSink {
+  public ClusteringCommitTestSink(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public void invoke(ClusteringCommitEvent event, Context context) throws 
Exception {
+    super.invoke(event, context);
+    List<HoodieInstant> instants = 
writeClient.getHoodieTable().getMetaClient().getActiveTimeline().getInstants();
+    boolean committed = instants.stream().anyMatch(i -> 
i.getTimestamp().equals(event.getInstant()) && i.isCompleted());
+    if (committed && getRuntimeContext().getAttemptNumber() == 0) {
+      throw new HoodieException("Fail first attempt to simulate failover in 
test.");
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 8f7fcf19fde..18a8aebb8fd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.cluster;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -47,21 +48,29 @@ import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestSQL;
 
 import org.apache.avro.Schema;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
@@ -74,6 +83,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -174,7 +184,7 @@ public class ITTestHoodieFlinkClustering {
     final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
 
-    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf))
         .name("clustering_source")
         .uid("uid_clustering_source")
         .rebalance()
@@ -383,7 +393,7 @@ public class ITTestHoodieFlinkClustering {
     final RowType rowType = (RowType) rowDataType.getLogicalType();
 
     DataStream<ClusteringCommitEvent> dataStream =
-        env.addSource(new ClusteringPlanSourceFunction(firstClusteringInstant, 
clusteringPlan))
+        env.addSource(new ClusteringPlanSourceFunction(firstClusteringInstant, 
clusteringPlan, conf))
             .name("clustering_source")
             .uid("uid_clustering_source")
             .rebalance()
@@ -574,7 +584,7 @@ public class ITTestHoodieFlinkClustering {
     HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
     table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
 
-    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf))
         .name("clustering_source")
         .uid("uid_clustering_source")
         .rebalance()
@@ -602,4 +612,120 @@ public class ITTestHoodieFlinkClustering {
     expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, 
id8,par4,id8,Han,56,8100001,par4]");
     TestData.checkWrittenData(tempFile, expected, 4);
   }
+
+  @Test
+  public void testOfflineClusterFailoverAfterCommit() throws Exception {
+    StreamTableEnvironment tableEnv = prepareEnvAndTable();
+
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+    assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
+
+    Table result = tableEnv.sqlQuery("select count(*) from t1");
+    assertEquals(16L, tableEnv.toDataStream(result, 
Row.class).executeAndCollect(1).get(0).getField(0));
+  }
+
+  private StreamTableEnvironment prepareEnvAndTable() {
+    // Create hoodie table and insert into data.
+    Configuration conf = new org.apache.flink.configuration.Configuration();
+    conf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+    
tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 4);
+    tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, 
true);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+    options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.COPY_ON_WRITE.name());
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tEnv.executeSql(hoodieTableDDL);
+    tEnv.executeSql(TestSQL.INSERT_T1);
+    return tEnv;
+  }
+
+  /**
+   * schedule clustering, insert another batch, run clustering.
+   */
+  private void runOfflineCluster(TableEnvironment tableEnv, Configuration 
conf) throws Exception {
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
Time.milliseconds(1)));
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the clustering instant time and do clustering.
+    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertTrue(scheduled, "The clustering plan should be scheduled");
+
+    tableEnv.executeSql(TestSQL.INSERT_T1);
+
+    // fetch the instant based on the configured execution sequence
+    table.getMetaClient().reloadActiveTimeline();
+    HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+
+    // generate clustering plan
+    // should support configurable commit metadata
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = 
ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), timeline.lastInstant().get());
+
+    HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
+
+    // Mark instant as clustering inflight
+    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
+
+    final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf))
+        .name("clustering_source")
+        .uid("uid_clustering_source")
+        .rebalance()
+        .transform("clustering_task",
+            TypeInformation.of(ClusteringCommitEvent.class),
+            new ClusteringOperator(conf, rowType))
+        .setParallelism(clusteringPlan.getInputGroups().size());
+
+    ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+    dataStream
+        .addSink(new ClusteringCommitTestSink(conf))
+        .name("clustering_commit")
+        .uid("uid_clustering_commit")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_clustering");
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/CompactionCommitTestSink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/CompactionCommitTestSink.java
new file mode 100644
index 00000000000..28e75871b9d
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/CompactionCommitTestSink.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.client.HoodieTimelineArchiver;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.List;
+
+/**
+ * CompactionCommitTestSink, throw for first attempt to simulate failure
+ */
+public class CompactionCommitTestSink extends CompactionCommitSink {
+  public CompactionCommitTestSink(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public void invoke(CompactionCommitEvent event, Context context) throws 
Exception {
+    super.invoke(event, context);
+    List<HoodieInstant> instants = 
writeClient.getHoodieTable().getMetaClient().getActiveTimeline().getInstants();
+    boolean compactCommitted = instants.stream().anyMatch(i -> 
i.getTimestamp().equals(event.getInstant()) && i.isCompleted());
+    if (compactCommitted && getRuntimeContext().getAttemptNumber() == 0) {
+      // archive compact instant
+      
this.writeClient.getConfig().setValue(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP, 
"1");
+      
this.writeClient.getConfig().setValue(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP, 
"1");
+      HoodieTimelineArchiver archiver = new 
HoodieTimelineArchiver(this.writeClient.getConfig(), 
this.writeClient.getHoodieTable());
+      this.writeClient.getHoodieTable().getMetaClient().reloadActiveTimeline();
+      archiver.archiveIfRequired(HoodieFlinkEngineContext.DEFAULT);
+      throw new HoodieException("Fail first attempt to simulate failover in 
test.");
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index bf5c13f366c..b032ad46765 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -20,6 +20,10 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -39,12 +43,15 @@ import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestSQL;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -55,14 +62,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -154,7 +166,7 @@ public class ITTestHoodieFlinkCompactor {
     // Mark instant as compaction inflight
     table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
 
-    env.addSource(new 
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
 compactionPlan))))
+    env.addSource(new 
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
 compactionPlan)), conf))
         .name("compaction_source")
         .uid("uid_compaction_source")
         .rebalance()
@@ -235,7 +247,7 @@ public class ITTestHoodieFlinkCompactor {
     // Mark instant as compaction inflight
     table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
 
-    env.addSource(new 
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
 compactionPlan))))
+    env.addSource(new 
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
 compactionPlan)), conf))
         .name("compaction_source")
         .uid("uid_compaction_source")
         .rebalance()
@@ -358,7 +370,7 @@ public class ITTestHoodieFlinkCompactor {
     }
     table.getMetaClient().reloadActiveTimeline();
 
-    env.addSource(new CompactionPlanSourceFunction(compactionPlans))
+    env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
         .name("compaction_source")
         .uid("uid_compaction_source")
         .rebalance()
@@ -393,6 +405,111 @@ public class ITTestHoodieFlinkCompactor {
     TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
   }
 
+  @Test
+  public void testOfflineCompactFailoverAfterCommit() {
+    TableEnvironment tableEnv = prepareEnvAndTable();
+
+    tableEnv.executeSql(TestSQL.INSERT_T1);
+
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+
+    assertDoesNotThrow(() -> runOfflineCompact(tableEnv, conf));
+    assertNoDuplicateFile(conf);
+  }
+
+  private void assertNoDuplicateFile(Configuration conf) {
+    Set<Pair<String, String>> fileIdCommitTimeSet = new HashSet<>();
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    HoodieWrapperFileSystem fs = metaClient.getFs();
+    FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, 
metaClient.getBasePath(), false, false).forEach(
+        partition -> {
+          try {
+            
Arrays.stream(fs.listStatus(FSUtils.getPartitionPath(metaClient.getBasePathV2(),
 partition)))
+                .filter(f -> FSUtils.isBaseFile(f.getPath()))
+                .forEach(f -> {
+                  HoodieBaseFile baseFile = new HoodieBaseFile(f);
+                  
assertFalse(fileIdCommitTimeSet.contains(Pair.of(baseFile.getFileId(), 
baseFile.getCommitTime())));
+                  fileIdCommitTimeSet.add(Pair.of(baseFile.getFileId(), 
baseFile.getCommitTime()));
+                });
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
+    assertFalse(fileIdCommitTimeSet.isEmpty());
+  }
+
+  private TableEnvironment prepareEnvAndTable() {
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    
tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 4);
+    
tableEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, 
true);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.METADATA_ENABLED.key(), "false"); // to archive 
compaction instant
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    return tableEnv;
+  }
+
+  /**
+   * schedule compact, insert another batch, run compact.
+   */
+  private void runOfflineCompact(TableEnvironment tableEnv, Configuration 
conf) throws Exception {
+    conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // infer changelog mode
+    CompactionUtil.inferChangelogMode(conf, metaClient);
+
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+
+    String compactionInstantTime = scheduleCompactionPlan(metaClient, 
writeClient);
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+    // generate compaction plan
+    // should support configurable commit metadata
+    HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+        table.getMetaClient(), compactionInstantTime);
+
+    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+    // Mark instant as compaction inflight
+    table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+
+    tableEnv.executeSql(TestSQL.INSERT_T1);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
Time.milliseconds(1)));
+
+    env.addSource(new 
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
 compactionPlan)), conf))
+        .name("compaction_source")
+        .uid("uid_compaction_source")
+        .rebalance()
+        .transform("compact_task",
+            TypeInformation.of(CompactionCommitEvent.class),
+            new CompactOperator(conf))
+        .setParallelism(1)
+        .addSink(new CompactionCommitTestSink(conf))
+        .name("compaction_commit")
+        .uid("uid_compaction_commit")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_compaction");
+  }
+
   private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, 
HoodieFlinkWriteClient<?> writeClient) {
     boolean scheduled = false;
     // judge whether there are any compaction operations.

Reply via email to