This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cc33f53c794 [HUDI-6366] Prevent flink offline table service rerun
completed instant (#8950)
cc33f53c794 is described below
commit cc33f53c794a3295044daacaaed900c77274bbba
Author: Bingeng Huang <[email protected]>
AuthorDate: Fri Jun 16 00:16:10 2023 +0800
[HUDI-6366] Prevent flink offline table service rerun completed instant
(#8950)
---------
Co-authored-by: hbg <[email protected]>
---
.../clustering/ClusteringPlanSourceFunction.java | 17 ++-
.../sink/clustering/HoodieFlinkClusteringJob.java | 2 +-
.../sink/compact/CompactionPlanSourceFunction.java | 11 +-
.../hudi/sink/compact/HoodieFlinkCompactor.java | 2 +-
.../sink/cluster/ClusteringCommitTestSink.java | 47 ++++++++
.../sink/cluster/ITTestHoodieFlinkClustering.java | 132 ++++++++++++++++++++-
.../sink/compact/CompactionCommitTestSink.java | 54 +++++++++
.../sink/compact/ITTestHoodieFlinkCompactor.java | 123 ++++++++++++++++++-
8 files changed, 375 insertions(+), 13 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
index fafaf9a1ce9..ed78e33c10f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
@@ -60,9 +61,12 @@ public class ClusteringPlanSourceFunction extends
AbstractRichFunction implement
*/
private final String clusteringInstantTime;
- public ClusteringPlanSourceFunction(String clusteringInstantTime,
HoodieClusteringPlan clusteringPlan) {
+ private final Configuration conf;
+
+ public ClusteringPlanSourceFunction(String clusteringInstantTime,
HoodieClusteringPlan clusteringPlan, Configuration conf) {
this.clusteringInstantTime = clusteringInstantTime;
this.clusteringPlan = clusteringPlan;
+ this.conf = conf;
}
@Override
@@ -72,9 +76,14 @@ public class ClusteringPlanSourceFunction extends
AbstractRichFunction implement
@Override
public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws
Exception {
- for (HoodieClusteringGroup clusteringGroup :
clusteringPlan.getInputGroups()) {
- LOG.info("Execute clustering plan for instant {} as {} file slices",
clusteringInstantTime, clusteringGroup.getSlices().size());
- sourceContext.collect(new
ClusteringPlanEvent(this.clusteringInstantTime,
ClusteringGroupInfo.create(clusteringGroup),
clusteringPlan.getStrategy().getStrategyParams()));
+ boolean isPending =
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterPendingReplaceTimeline().containsInstant(clusteringInstantTime);
+ if (isPending) {
+ for (HoodieClusteringGroup clusteringGroup :
clusteringPlan.getInputGroups()) {
+ LOG.info("Execute clustering plan for instant {} as {} file slices",
clusteringInstantTime, clusteringGroup.getSlices().size());
+ sourceContext.collect(new
ClusteringPlanEvent(this.clusteringInstantTime,
ClusteringGroupInfo.create(clusteringGroup),
clusteringPlan.getStrategy().getStrategyParams()));
+ }
+ } else {
+ LOG.warn(clusteringInstantTime + " not found in pending clustering
instants.");
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 223f85defca..a47dfe39586 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -330,7 +330,7 @@ public class HoodieFlinkClusteringJob {
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
- DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstant.getTimestamp(), clusteringPlan))
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstant.getTimestamp(), clusteringPlan,
conf))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
index 883ba8bd114..6e48c0b386c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
@@ -20,7 +20,9 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
@@ -56,9 +58,11 @@ public class CompactionPlanSourceFunction extends
AbstractRichFunction implement
* compaction plan instant -> compaction plan
*/
private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;
+ private final Configuration conf;
- public CompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>>
compactionPlans) {
+ public CompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>>
compactionPlans, Configuration conf) {
this.compactionPlans = compactionPlans;
+ this.conf = conf;
}
@Override
@@ -68,7 +72,12 @@ public class CompactionPlanSourceFunction extends
AbstractRichFunction implement
@Override
public void run(SourceContext sourceContext) throws Exception {
+ HoodieTimeline pendingCompactionTimeline =
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterPendingCompactionTimeline();
for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
+ if (!pendingCompactionTimeline.containsInstant(pair.getLeft())) {
+ LOG.warn(pair.getLeft() + " not found in pending compaction
instants.");
+ continue;
+ }
HoodieCompactionPlan compactionPlan = pair.getRight();
List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index e396897dc7e..74fe7929607 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -290,7 +290,7 @@ public class HoodieFlinkCompactor {
table.getMetaClient().reloadActiveTimeline();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.addSource(new CompactionPlanSourceFunction(compactionPlans))
+ env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ClusteringCommitTestSink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ClusteringCommitTestSink.java
new file mode 100644
index 00000000000..96f8d7423a1
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ClusteringCommitTestSink.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class ClusteringCommitTestSink extends ClusteringCommitSink {
+ public ClusteringCommitTestSink(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public void invoke(ClusteringCommitEvent event, Context context) throws
Exception {
+ super.invoke(event, context);
+ List<HoodieInstant> instants =
writeClient.getHoodieTable().getMetaClient().getActiveTimeline().getInstants();
+ boolean committed = instants.stream().anyMatch(i ->
i.getTimestamp().equals(event.getInstant()) && i.isCompleted());
+ if (committed && getRuntimeContext().getAttemptNumber() == 0) {
+ throw new HoodieException("Fail first attempt to simulate failover in
test.");
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 8f7fcf19fde..18a8aebb8fd 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.cluster;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -47,21 +48,29 @@ import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.apache.avro.Schema;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
@@ -74,6 +83,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -174,7 +184,7 @@ public class ITTestHoodieFlinkClustering {
final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
- DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
@@ -383,7 +393,7 @@ public class ITTestHoodieFlinkClustering {
final RowType rowType = (RowType) rowDataType.getLogicalType();
DataStream<ClusteringCommitEvent> dataStream =
- env.addSource(new ClusteringPlanSourceFunction(firstClusteringInstant,
clusteringPlan))
+ env.addSource(new ClusteringPlanSourceFunction(firstClusteringInstant,
clusteringPlan, conf))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
@@ -574,7 +584,7 @@ public class ITTestHoodieFlinkClustering {
HoodieInstant instant =
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());
- DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
@@ -602,4 +612,120 @@ public class ITTestHoodieFlinkClustering {
expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4,
id8,par4,id8,Han,56,8100001,par4]");
TestData.checkWrittenData(tempFile, expected, 4);
}
+
+ @Test
+ public void testOfflineClusterFailoverAfterCommit() throws Exception {
+ StreamTableEnvironment tableEnv = prepareEnvAndTable();
+
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ cfg.targetPartitions = 4;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+ assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
+
+ Table result = tableEnv.sqlQuery("select count(*) from t1");
+ assertEquals(16L, tableEnv.toDataStream(result,
Row.class).executeAndCollect(1).get(0).getField(0));
+ }
+
+ private StreamTableEnvironment prepareEnvAndTable() {
+ // Create hoodie table and insert into data.
+ Configuration conf = new org.apache.flink.configuration.Configuration();
+ conf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
4);
+ tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC,
true);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.INSERT.value());
+ options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+ options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.COPY_ON_WRITE.name());
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tEnv.executeSql(hoodieTableDDL);
+ tEnv.executeSql(TestSQL.INSERT_T1);
+ return tEnv;
+ }
+
+ /**
+ * schedule clustering, insert another batch, run clustering.
+ */
+ private void runOfflineCluster(TableEnvironment tableEnv, Configuration
conf) throws Exception {
+ // Make configuration and setAvroSchema.
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1,
Time.milliseconds(1)));
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+
+ long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // judge whether have operation
+ // To compute the clustering instant time and do clustering.
+ String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ boolean scheduled =
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+ assertTrue(scheduled, "The clustering plan should be scheduled");
+
+ tableEnv.executeSql(TestSQL.INSERT_T1);
+
+ // fetch the instant based on the configured execution sequence
+ table.getMetaClient().reloadActiveTimeline();
+ HoodieTimeline timeline =
table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+
+ // generate clustering plan
+ // should support configurable commit metadata
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(), timeline.lastInstant().get());
+
+ HoodieClusteringPlan clusteringPlan =
clusteringPlanOption.get().getRight();
+
+ // Mark instant as clustering inflight
+ HoodieInstant instant =
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+ table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());
+
+ final Schema tableAvroSchema =
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+ final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan, conf))
+ .name("clustering_source")
+ .uid("uid_clustering_source")
+ .rebalance()
+ .transform("clustering_task",
+ TypeInformation.of(ClusteringCommitEvent.class),
+ new ClusteringOperator(conf, rowType))
+ .setParallelism(clusteringPlan.getInputGroups().size());
+
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+ dataStream
+ .addSink(new ClusteringCommitTestSink(conf))
+ .name("clustering_commit")
+ .uid("uid_clustering_commit")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_clustering");
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/CompactionCommitTestSink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/CompactionCommitTestSink.java
new file mode 100644
index 00000000000..28e75871b9d
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/CompactionCommitTestSink.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.client.HoodieTimelineArchiver;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.List;
+
+/**
+ * CompactionCommitTestSink, throw for first attempt to simulate failure
+ */
+public class CompactionCommitTestSink extends CompactionCommitSink {
+ public CompactionCommitTestSink(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public void invoke(CompactionCommitEvent event, Context context) throws
Exception {
+ super.invoke(event, context);
+ List<HoodieInstant> instants =
writeClient.getHoodieTable().getMetaClient().getActiveTimeline().getInstants();
+ boolean compactCommitted = instants.stream().anyMatch(i ->
i.getTimestamp().equals(event.getInstant()) && i.isCompleted());
+ if (compactCommitted && getRuntimeContext().getAttemptNumber() == 0) {
+ // archive compact instant
+
this.writeClient.getConfig().setValue(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP,
"1");
+
this.writeClient.getConfig().setValue(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP,
"1");
+ HoodieTimelineArchiver archiver = new
HoodieTimelineArchiver(this.writeClient.getConfig(),
this.writeClient.getHoodieTable());
+ this.writeClient.getHoodieTable().getMetaClient().reloadActiveTimeline();
+ archiver.archiveIfRequired(HoodieFlinkEngineContext.DEFAULT);
+ throw new HoodieException("Fail first attempt to simulate failover in
test.");
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index bf5c13f366c..b032ad46765 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -20,6 +20,10 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -39,12 +43,15 @@ import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -55,14 +62,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -154,7 +166,7 @@ public class ITTestHoodieFlinkCompactor {
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
- env.addSource(new
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
compactionPlan))))
+ env.addSource(new
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
compactionPlan)), conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
@@ -235,7 +247,7 @@ public class ITTestHoodieFlinkCompactor {
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
- env.addSource(new
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
compactionPlan))))
+ env.addSource(new
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
compactionPlan)), conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
@@ -358,7 +370,7 @@ public class ITTestHoodieFlinkCompactor {
}
table.getMetaClient().reloadActiveTimeline();
- env.addSource(new CompactionPlanSourceFunction(compactionPlans))
+ env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
@@ -393,6 +405,111 @@ public class ITTestHoodieFlinkCompactor {
TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
}
+ @Test
+ public void testOfflineCompactFailoverAfterCommit() {
+ TableEnvironment tableEnv = prepareEnvAndTable();
+
+ tableEnv.executeSql(TestSQL.INSERT_T1);
+
+ FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+
+ assertDoesNotThrow(() -> runOfflineCompact(tableEnv, conf));
+ assertNoDuplicateFile(conf);
+ }
+
+ private void assertNoDuplicateFile(Configuration conf) {
+ Set<Pair<String, String>> fileIdCommitTimeSet = new HashSet<>();
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ HoodieWrapperFileSystem fs = metaClient.getFs();
+ FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT,
metaClient.getBasePath(), false, false).forEach(
+ partition -> {
+ try {
+
Arrays.stream(fs.listStatus(FSUtils.getPartitionPath(metaClient.getBasePathV2(),
partition)))
+ .filter(f -> FSUtils.isBaseFile(f.getPath()))
+ .forEach(f -> {
+ HoodieBaseFile baseFile = new HoodieBaseFile(f);
+
assertFalse(fileIdCommitTimeSet.contains(Pair.of(baseFile.getFileId(),
baseFile.getCommitTime())));
+ fileIdCommitTimeSet.add(Pair.of(baseFile.getFileId(),
baseFile.getCommitTime()));
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertFalse(fileIdCommitTimeSet.isEmpty());
+ }
+
+ private TableEnvironment prepareEnvAndTable() {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+
tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
4);
+
tableEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC,
true);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.METADATA_ENABLED.key(), "false"); // to archive
compaction instant
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tableEnv.executeSql(hoodieTableDDL);
+ return tableEnv;
+ }
+
+ /**
+ * schedule compact, insert another batch, run compact.
+ */
+ private void runOfflineCompact(TableEnvironment tableEnv, Configuration
conf) throws Exception {
+ conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // infer changelog mode
+ CompactionUtil.inferChangelogMode(conf, metaClient);
+
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
+
+ String compactionInstantTime = scheduleCompactionPlan(metaClient,
writeClient);
+
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+ // generate compaction plan
+ // should support configurable commit metadata
+ HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+ table.getMetaClient(), compactionInstantTime);
+
+ HoodieInstant instant =
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+ // Mark instant as compaction inflight
+ table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+
+ tableEnv.executeSql(TestSQL.INSERT_T1);
+
+ // Make configuration and setAvroSchema.
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1,
Time.milliseconds(1)));
+
+ env.addSource(new
CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime,
compactionPlan)), conf))
+ .name("compaction_source")
+ .uid("uid_compaction_source")
+ .rebalance()
+ .transform("compact_task",
+ TypeInformation.of(CompactionCommitEvent.class),
+ new CompactOperator(conf))
+ .setParallelism(1)
+ .addSink(new CompactionCommitTestSink(conf))
+ .name("compaction_commit")
+ .uid("uid_compaction_commit")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_compaction");
+ }
+
private String scheduleCompactionPlan(HoodieTableMetaClient metaClient,
HoodieFlinkWriteClient<?> writeClient) {
boolean scheduled = false;
// judge whether there are any compaction operations.