This is an automated email from the ASF dual-hosted git repository. nagarwal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5ce64a8 Fix the filter condition is missing in the judgment condition of compaction instance (#3025) 5ce64a8 is described below commit 5ce64a81bdfaa94aefacf7b365e6fb044763827c Author: swuferhong <337361...@qq.com> AuthorDate: Thu Jun 17 05:28:53 2021 +0800 Fix the filter condition is missing in the judgment condition of compaction instance (#3025) --- .../BaseScheduleCompactionActionExecutor.java | 2 +- .../table/action/compact/TestHoodieCompactor.java | 24 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java index b744a7e..25c2fec 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java @@ -63,7 +63,7 @@ public abstract class BaseScheduleCompactionActionExecutor<T extends HoodieRecor + ", Compaction scheduled at " + instantTime)); // Committed and pending compaction instants should have strictly lower timestamps List<HoodieInstant> conflictingInstants = table.getActiveTimeline() - .getWriteTimeline().getInstants() + .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() .filter(instant -> HoodieTimeline.compareTimestamps( instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)) .collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 734fcc2..2706dce 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -133,6 +133,30 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { } @Test + public void testScheduleCompactionWithInflightInstant() { + HoodieWriteConfig config = getConfig(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { + // insert 100 records. + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); + writeClient.insert(recordsRDD, newCommitTime).collect(); + + // create one inflight instance. + newCommitTime = "102"; + writeClient.startCommitWithTime(newCommitTime); + metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, + HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); + + // create one compaction instance before exist inflight instance. + String compactionTime = "101"; + writeClient.scheduleCompactionAtInstant(compactionTime, Option.empty()); + } + } + + @Test public void testWriteStatusContentsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig();