This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 95d8c6b9232 [HUDI-5569] Maintain commit timeline even in case of long
standing inflights (#8783)
95d8c6b9232 is described below
commit 95d8c6b9232b803c17482752ea409422447dc945
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Jun 13 17:28:50 2023 -0700
[HUDI-5569] Maintain commit timeline even in case of long standing
inflights (#8783)
- Maintain commit timeline even in case of long standing inflights
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 48 ++++----
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 124 ++++++++++++++++++++-
.../apache/hudi/common/util/ClusteringUtils.java | 7 +-
.../common/testutils/HoodieTestDataGenerator.java | 33 ++++++
.../hudi/hadoop/TestHoodieParquetInputFormat.java | 35 ++++++
5 files changed, 221 insertions(+), 26 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index c81f2d69779..0a6659cb1e7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -413,15 +412,33 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
// with logic above to avoid Stream.concat
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
- Option<HoodieInstant> oldestPendingCompactionAndReplaceInstant =
table.getActiveTimeline()
-
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.REPLACE_COMMIT_ACTION))
- .filter(s -> !s.isCompleted())
+ // Get the oldest inflight instant and a completed commit before this
inflight instant.
+ Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
+ .getWriteTimeline()
+ .filter(instant -> !instant.isCompleted())
.firstInstant();
- Option<HoodieInstant> oldestInflightCommitInstant =
- table.getActiveTimeline()
-
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
- .filterInflights().firstInstant();
+ // Oldest commit to retain is the greatest completed commit, that is less
than the oldest pending instant.
+ // In some cases when inflight is the lowest commit then oldest commit to
retain will be equal to oldest
+ // inflight commit.
+ Option<HoodieInstant> oldestCommitToRetain;
+ if (oldestPendingInstant.isPresent()) {
+ Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
+ Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+ LESSER_THAN,
oldestPendingInstant.get().getTimestamp())).findFirst());
+ // Check if the completed instant is higher than the oldest inflight
instant
+ // in that case update the oldestCommitToRetain to oldestInflight commit
time.
+ if (!completedCommitBeforeOldestPendingInstant.isPresent()
+ ||
HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
+ LESSER_THAN,
completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
+ oldestCommitToRetain = oldestPendingInstant;
+ } else {
+ oldestCommitToRetain = completedCommitBeforeOldestPendingInstant;
+ }
+ } else {
+ oldestCommitToRetain = Option.empty();
+ }
// NOTE: We cannot have any holes in the commit timeline.
// We cannot archive any commits which are made after the first savepoint
present,
@@ -460,19 +477,12 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
return !(firstSavepoint.isPresent() &&
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS,
s.getTimestamp()));
}
}).filter(s -> {
- // Ensure commits >= the oldest pending compaction/replace commit
is retained
- return oldestPendingCompactionAndReplaceInstant
+ // oldestCommitToRetain is the highest completed commit instant
that is less than the oldest inflight instant.
+ // By filtering out any commit >= oldestCommitToRetain, we can
ensure there are no gaps in the timeline
+ // when inflight commits are present.
+ return oldestCommitToRetain
.map(instant -> compareTimestamps(instant.getTimestamp(),
GREATER_THAN, s.getTimestamp()))
.orElse(true);
- }).filter(s -> {
- // We need this to ensure that when multiple writers are
performing conflict resolution, eligible instants don't
- // get archived, i.e, instants after the oldestInflight are
retained on the timeline
- if (config.getFailedWritesCleanPolicy() ==
HoodieFailedWritesCleaningPolicy.LAZY) {
- return oldestInflightCommitInstant.map(instant ->
- compareTimestamps(instant.getTimestamp(), GREATER_THAN,
s.getTimestamp()))
- .orElse(true);
- }
- return true;
}).filter(s ->
oldestInstantToRetainForCompaction.map(instantToRetain ->
compareTimestamps(s.getTimestamp(), LESSER_THAN,
instantToRetain.getTimestamp()))
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 73bfacfd9d7..b049c60b9e7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -90,6 +90,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -913,19 +914,21 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
if (i != 7) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
- // on 7th commit, archival will kick in. but will archive only one
commit since 2nd compaction commit is inflight.
- assertEquals(originalCommits.size() - commitsAfterArchival.size(),
1);
+ // on 7th commit, archival will kick in, but cannot archive any
commit,
+ // since 1st deltacommit is the greatest completed commit before an
oldest inflight commit.
+ assertEquals(originalCommits.size() - commitsAfterArchival.size(),
0);
}
} else {
if (i != 7) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
- // on 7th commit, archival will kick in. but will archive only one
commit since 2nd compaction commit is inflight.
- assertEquals(originalCommits.size() - commitsAfterArchival.size(),
1);
+ // on 7th commit, archival will kick in, but cannot archive any
commit,
+ // since 1st deltacommit is the greatest completed commit before an
oldest inflight commit.
+ assertEquals(originalCommits.size() - commitsAfterArchival.size(),
0);
for (int j = 1; j <= 7; j++) {
if (j == 1) {
- // first commit should be archived
- assertFalse(commitsAfterArchival.contains(new
HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" +
j)));
+ // first commit should not be archived
+ assertTrue(commitsAfterArchival.contains(new
HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" +
j)));
} else if (j == 2) {
// 2nd compaction should not be archived
assertFalse(commitsAfterArchival.contains(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" +
j)));
@@ -1418,6 +1421,115 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
}
}
+ /**
+ * Test archival functionality when there are inflights files.
+ * Archive should hold on to the greatest completed commit that is less than
the oldes inflight commit.
+ * @throws Exception
+ */
+ @Test
+ public void testGetCommitInstantsToArchiveDuringInflightCommits() throws
Exception {
+ HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 3, 4, 2);
+
+ Set<String> expectedInstants = new HashSet<>();
+ // Create 3 completed commits.
+ for (int i = 0; i < 3; i++) {
+ String instantTime = "100" + i;
+ HoodieTestDataGenerator.createCommitFile(basePath, instantTime,
wrapperFs.getConf());
+ expectedInstants.add(instantTime);
+ }
+ // Create an inflight file.
+ String replaceInstant = "1003";
+ HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath,
replaceInstant, wrapperFs.getConf());
+ expectedInstants.add(replaceInstant);
+ // Create 3 more instants
+ for (int i = 4; i < 7; i++) {
+ String instantTime = "100" + i;
+ HoodieTestDataGenerator.createCommitFile(basePath, instantTime,
wrapperFs.getConf());
+ expectedInstants.add(instantTime);
+ }
+ // Create another inflight commit
+ HoodieTestDataGenerator.createRequestedCommitFile(basePath, "1007",
wrapperFs.getConf());
+ HoodieTestDataGenerator.createPendingCommitFile(basePath, "1007",
wrapperFs.getConf());
+ expectedInstants.add("1007");
+ // Create 6 more instants
+ for (int i = 0; i < 6; i++) {
+ String instantTime = "101" + i;
+ HoodieTestDataGenerator.createCommitFile(basePath, instantTime,
wrapperFs.getConf());
+ expectedInstants.add(instantTime);
+ }
+ HoodieTimeline timeline =
metaClient.reloadActiveTimeline().getWriteTimeline();
+
+ // Check the count of instants.
+ assertEquals(expectedInstants.size(), timeline.countInstants(), "Loaded 14
commits and the count should match");
+
+ // Run archival
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+ boolean result = archiver.archiveIfRequired(context);
+ expectedInstants.remove("1000");
+ expectedInstants.remove("1001");
+ assertTrue(result);
+ timeline = metaClient.reloadActiveTimeline().getWriteTimeline();
+
+ // Check the count of instants after archive it should have 2 less instants
+ // because 103 replacecommit's inflight will block archival.
+ assertEquals(12, timeline.countInstants(), "After archival only first 2
commits should be archived");
+ assertEquals(expectedInstants.size(), timeline.countInstants(), "After
archival only first 2 commits should be archived");
+
+ HoodieTimeline finalTimeline = timeline;
+ assertEquals(12, expectedInstants.stream().filter(instant ->
finalTimeline.containsInstant(instant)).count());
+ assertEquals("1002",
timeline.getInstantsAsStream().findFirst().get().getTimestamp());
+
+ // Delete replacecommit requested instant.
+ Path replaceCommitRequestedPath = new Path(
+ basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeRequestedReplaceFileName(replaceInstant));
+ metaClient.getFs().delete(replaceCommitRequestedPath);
+ metaClient.reloadActiveTimeline();
+
+ // Run archival
+ assertTrue(archiver.archiveIfRequired(context));
+ timeline = metaClient.reloadActiveTimeline().getWriteTimeline();
+ expectedInstants.removeAll(Arrays.asList("1002", "1003", "1004", "1005"));
+
+ // Check the count of instants after archive it should have 3 more less
instants
+ // This time 1007 inflight commit will block archival.
+ assertEquals(8, timeline.countInstants(), "After archival only first 2
commits should be archived");
+ assertEquals(expectedInstants.size(), timeline.countInstants(), "After
archival only first 2 commits should be archived");
+ HoodieTimeline refreshedTimeline = timeline;
+ assertEquals(8, expectedInstants.stream().filter(instant ->
refreshedTimeline.containsInstant(instant)).count());
+ assertEquals("1006",
timeline.getInstantsAsStream().findFirst().get().getTimestamp());
+ }
+
+ /**
+ * If replacecommit inflight is the oldest commit in the timeline or for
that matter any inflight commit is present
+ * then the archival is blocked from there. This method test this scenario.
+ */
+ @Test
+ public void testWithOldestReplaceCommit() throws Exception {
+ HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 2, 3, 2);
+
+ HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, "1001",
wrapperFs.getConf());
+ HoodieTestDataGenerator.createReplaceCommitInflightFile(basePath, "1001",
wrapperFs.getConf());
+ // Create 8 completed commits.
+ for (int i = 2; i < 10; i++) {
+ String instantTime = "100" + i;
+ HoodieTestDataGenerator.createCommitFile(basePath, instantTime,
wrapperFs.getConf());
+ }
+
+ HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+
+ HoodieTimeline timeline = metaClient.reloadActiveTimeline();
+ assertEquals(9, timeline.countInstants(), "Loaded 9 commits and the count
should match");
+ boolean result = archiver.archiveIfRequired(context);
+ assertTrue(result);
+ timeline = metaClient.reloadActiveTimeline();
+ assertEquals(9, timeline.countInstants(),
+ "Since we have a pending replacecommit at 1001, we should never
archive any commit after 1001");
+ assertEquals("1001",
timeline.getInstantsAsStream().findFirst().get().getTimestamp());
+ }
+
@Test
public void testArchivalAndCompactionInMetadataTable() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index fae8362a744..dc6de394bf3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -256,7 +256,12 @@ public class ClusteringUtils {
retainLowerBound = earliestInstantToRetain.getTimestamp();
} else {
// no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS
clean policy,
- // retain first instant after clean instant
+ // retain first instant after clean instant.
+ // For KEEP_LATEST_FILE_VERSIONS cleaner policy, file versions are
only maintained for active file groups
+ // not for replaced file groups. So, last clean instant can be
considered as a lower bound, since
+ // the cleaner would have removed all the file groups until then.
But there is a catch to this logic,
+ // while cleaner is running if there is a pending replacecommit then
those files are not cleaned.
+ // TODO: This case has to be handled. HUDI-6352
retainLowerBound = cleanInstant.getTimestamp();
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 96b22b14990..80fb90f6fe3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -493,6 +493,18 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return rec;
}
+ public static void createRequestedCommitFile(String basePath, String
instantTime, Configuration configuration) throws IOException {
+ Path pendingRequestedFile = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeRequestedCommitFileName(instantTime));
+ createEmptyFile(basePath, pendingRequestedFile, configuration);
+ }
+
+ public static void createPendingCommitFile(String basePath, String
instantTime, Configuration configuration) throws IOException {
+ Path pendingCommitFile = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeInflightCommitFileName(instantTime));
+ createEmptyFile(basePath, pendingCommitFile, configuration);
+ }
+
public static void createCommitFile(String basePath, String instantTime,
Configuration configuration) {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
createCommitFile(basePath, instantTime, configuration, commitMetadata);
@@ -534,6 +546,20 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
}
}
+ public static void createReplaceCommitRequestedFile(String basePath, String
instantTime, Configuration configuration)
+ throws IOException {
+ Path commitFile = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeRequestedReplaceFileName(instantTime));
+ createEmptyFile(basePath, commitFile, configuration);
+ }
+
+ public static void createReplaceCommitInflightFile(String basePath, String
instantTime, Configuration configuration)
+ throws IOException {
+ Path commitFile = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeInflightReplaceFileName(instantTime));
+ createEmptyFile(basePath, commitFile, configuration);
+ }
+
private static void createPendingReplaceFile(String basePath, String
instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime),
HoodieTimeline.makeRequestedReplaceFileName(instantTime))
@@ -558,6 +584,13 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
os.close();
}
+ public static void createCompactionRequestedFile(String basePath, String
instantTime, Configuration configuration)
+ throws IOException {
+ Path commitFile = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeRequestedCompactionFileName(instantTime));
+ createEmptyFile(basePath, commitFile, configuration);
+ }
+
public static void createCompactionAuxiliaryMetadata(String basePath,
HoodieInstant instant,
Configuration
configuration) throws IOException {
Path commitFile =
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 32690f12096..ab1a7a4551c 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -694,6 +694,41 @@ public class TestHoodieParquetInputFormat {
ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files
committed at 100", files, "100", 10);
}
+ /**
+ * Test scenario where inflight commit is between completed commits.
+ */
+ @Test
+ public void testSnapshotPreCommitValidateWithInflights() throws IOException {
+ // Create commit and data files with commit 000
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath,
baseFileFormat, 5, "000");
+ createCommitFile(basePath, "000", "2016/05/01");
+
+ // create inflight commit add more files with same file_id.
+ InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension,
"fileId1", 5, "100");
+ FileCreateUtils.createInflightCommit(basePath.toString(), "100");
+
+ // Create another commit without datafiles.
+ createCommitFile(basePath, "200", "2016/05/01");
+
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+ // Now, the original data files with commit time 000 should be returned.
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+ assertEquals(5, files.length, "Snapshot read must return all files in
partition");
+ ensureFilesInCommit("Should return base files from commit 000, inflight
data files with "
+ + "greater timestamp should be filtered", files, "000", 5);
+
+ // Create data files with same file_id for commit 200.
+ InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension,
"fileId1", 5, "200");
+
+ // This time data files from commit time 200 will be returned.
+ files = inputFormat.listStatus(jobConf);
+ assertEquals(5, files.length, "Snapshot read must return all files in
partition");
+ ensureFilesInCommit("Only completed commits files should be returned.",
+ files, "200", 5);
+ }
+
private void ensureRecordsInCommit(String msg, String commit, int
expectedNumberOfRecordsInCommit,
int totalExpected) throws IOException {
int actualCount = 0;