This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 44ca2bbcfd1 [HUDI-6992] IncrementalInputSplits incorrectly set the
latestCommit attribute (#9923)
44ca2bbcfd1 is described below
commit 44ca2bbcfd1512a55155e1033a9c9aca132efae6
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Thu Nov 9 15:04:50 2023 +0800
[HUDI-6992] IncrementalInputSplits incorrectly set the latestCommit
attribute (#9923)
---
.../org/apache/hudi/common/model/FileSlice.java | 13 ++++--
.../table/timeline/CompletionTimeQueryView.java | 7 +--
.../hudi/common/table/timeline/HoodieTimeline.java | 14 ++++++
.../apache/hudi/common/model/TestFileSlice.java | 50 ++++++++++++++++++++++
.../apache/hudi/source/IncrementalInputSplits.java | 7 ++-
.../hudi/source/TestIncrementalInputSplits.java | 47 ++++++++++++++++++++
.../source/TestStreamReadMonitoringFunction.java | 12 +++---
7 files changed, 134 insertions(+), 16 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
index 3f0fcf94156..d071385ea75 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import java.io.Serializable;
@@ -123,9 +124,15 @@ public class FileSlice implements Serializable {
}
/**
- * Returns true if there is no data file and no log files. Happens as part
of pending compaction
- *
- * @return
+ * Returns the latest instant time of the file slice.
+ */
+ public String getLatestInstantTime() {
+ Option<String> latestDeltaCommitTime =
getLatestLogFile().map(HoodieLogFile::getDeltaCommitTime);
+ return latestDeltaCommitTime.isPresent() ?
HoodieTimeline.maxInstant(latestDeltaCommitTime.get(), getBaseInstantTime()) :
getBaseInstantTime();
+ }
+
+ /**
+ * Returns true if there is no data file and no log files. Happens as part
of pending compaction.
*/
public boolean isEmpty() {
return (baseFile == null) && (logFiles.isEmpty());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 081cae8cb15..e53f185bffd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
* Query view for instant completion time.
@@ -81,7 +80,7 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String
cursorInstant) {
this.metaClient = metaClient;
this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
- this.cursorInstant = minInstant(cursorInstant,
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+ this.cursorInstant = HoodieTimeline.minInstant(cursorInstant,
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
// Note: use getWriteTimeline() to keep sync with the fs view
visibleCommitsAndCompactionTimeline, see
AbstractTableFileSystemView.refreshTimeline.
this.firstNonSavepointCommit =
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
load();
@@ -207,10 +206,6 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
this.startToCompletionInstantTimeMap.putIfAbsent(instantTime,
completionTime);
}
- private static String minInstant(String instant1, String instant2) {
- return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 :
instant2;
- }
-
public String getCursorInstant() {
return cursorInstant;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 82ec439bd25..53c7d25a00c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -415,6 +415,20 @@ public interface HoodieTimeline extends Serializable {
return predicateToApply.test(commit1, commit2);
}
+ /**
+ * Returns the smaller of the given two instants.
+ */
+ static String minInstant(String instant1, String instant2) {
+ return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 :
instant2;
+ }
+
+ /**
+ * Returns the greater of the given two instants.
+ */
+ static String maxInstant(String instant1, String instant2) {
+ return compareTimestamps(instant1, GREATER_THAN, instant2) ? instant1 :
instant2;
+ }
+
/**
* Return true if specified timestamp is in range (startTs, endTs].
*/
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
new file mode 100644
index 00000000000..376696bdfa9
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests {@link FileSlice}.
+ */
+public class TestFileSlice {
+ @Test
+ void testGetLatestInstantTime() {
+ String baseInstant = "003";
+ String deltaInstant2 = "002";
+ String deltaInstant4 = "004";
+
+ FileSlice fileSlice = new FileSlice("par1", baseInstant, "fg1");
+ assertThat(fileSlice.getLatestInstantTime(), is(baseInstant));
+
+ fileSlice.addLogFile(new HoodieLogFile(new
Path(getLogFileName(deltaInstant2))));
+ assertThat(fileSlice.getLatestInstantTime(), is(baseInstant));
+
+ fileSlice.addLogFile(new HoodieLogFile(new
Path(getLogFileName(deltaInstant4))));
+ assertThat(fileSlice.getLatestInstantTime(), is(deltaInstant4));
+ }
+
+ private static String getLogFileName(String instantTime) {
+ return ".fg1_" + instantTime + ".log.1_1-0-1";
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index d3391141834..2cc96cb8ae6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -57,6 +57,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -453,11 +454,15 @@ public class IncrementalInputSplits implements
Serializable {
.filter(logPath ->
!logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
.collect(Collectors.toList()));
String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ // the latest commit is used as the limit of the log reader
instant upper threshold,
+ // it must be at least the latest instant time of the file slice
to avoid data loss.
+ String latestCommit =
HoodieTimeline.minInstant(fileSlice.getLatestInstantTime(), endInstant);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- basePath, logPaths, endInstant,
+ basePath, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes,
mergeType, instantRange, fileSlice.getFileId());
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
+ .sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit))
.collect(Collectors.toList());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 2dff6416a29..92766186065 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -18,16 +18,19 @@
package org.apache.hudi.source;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -37,6 +40,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.hadoop.fs.FileStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -49,6 +53,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -58,6 +63,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link IncrementalInputSplits}.
@@ -229,6 +235,47 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
assertEquals(1, intervalBetween2Instants(commitsTimeline, minStartCommit,
maxEndCommit));
}
+ @Test
+ void testInputSplitsForSplitLastCommit() throws Exception {
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+ conf.set(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+
+ // insert data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ HoodieTimeline commitsTimeline =
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ List<HoodieInstant> instants = commitsTimeline.getInstants();
+ String lastInstant =
commitsTimeline.lastInstant().map(HoodieInstant::getTimestamp).get();
+ List<HoodieCommitMetadata> metadataList = instants.stream()
+ .map(instant -> WriteProfiles.getCommitMetadata(tableName, new
Path(basePath), instant, commitsTimeline)).collect(Collectors.toList());
+ FileStatus[] fileStatuses = WriteProfiles.getFilesFromMetadata(new
Path(basePath), metaClient.getHadoopConf(), metadataList,
metaClient.getTableType());
+ HoodieTableFileSystemView fileSystemView =
+ new HoodieTableFileSystemView(metaClient, commitsTimeline,
fileStatuses);
+ Map<String, String> fileIdToBaseInstant =
fileSystemView.getAllFileSlices("par1")
+ .collect(Collectors.toMap(FileSlice::getFileId,
FileSlice::getBaseInstantTime));
+
+ IncrementalInputSplits iis = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .partitionPruner(null)
+ .build();
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
null, false);
+ result.getInputSplits().stream().filter(split ->
fileIdToBaseInstant.containsKey(split.getFileId()))
+ .forEach(split ->
assertEquals(fileIdToBaseInstant.get(split.getFileId()),
split.getLatestCommit()));
+ assertTrue(result.getInputSplits().stream().anyMatch(split ->
split.getLatestCommit().equals(lastInstant)),
+ "Some input splits' latest commit time should equal to the last
instant");
+ assertTrue(result.getInputSplits().stream().anyMatch(split ->
!split.getLatestCommit().equals(lastInstant)),
+ "The input split latest commit time does not always equal to last
instant");
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 81bb6f03477..6edeceae0d8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -253,8 +253,8 @@ public class TestStreamReadMonitoringFunction {
sourceContext.getPartitionPaths(), is("par1"));
assertTrue(sourceContext.splits.stream().noneMatch(split ->
split.getInstantRange().isPresent()),
"No instants should have range limit");
- assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getLatestCommit().equals(c4)),
- "All the splits should be with specified instant time");
+ assertTrue(sourceContext.splits.stream().anyMatch(split ->
split.getLatestCommit().equals(c4)),
+ "At least one input split's latest commit time should be equal to
the specified instant time.");
// reset the source context
latch = new CountDownLatch(1);
@@ -270,8 +270,8 @@ public class TestStreamReadMonitoringFunction {
"All instants should have range limit");
assertTrue(sourceContext.splits.stream().allMatch(split ->
isPointInstantRange(split.getInstantRange().get(), c2)),
"All the splits should have point instant range");
- assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getLatestCommit().equals(c2)),
- "All the splits should be with specified instant time");
+ assertTrue(sourceContext.splits.stream().anyMatch(split ->
split.getLatestCommit().equals(c2)),
+ "At least one input split's latest commit time should be equal to
the specified instant time.");
// reset the source context
latch = new CountDownLatch(1);
@@ -287,8 +287,8 @@ public class TestStreamReadMonitoringFunction {
"All instants should have range limit");
assertTrue(sourceContext.splits.stream().allMatch(split ->
isPointInstantRange(split.getInstantRange().get(), c3)),
"All the splits should have point instant range");
- assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getLatestCommit().equals(c3)),
- "All the splits should be with specified instant time");
+ assertTrue(sourceContext.splits.stream().anyMatch(split ->
split.getLatestCommit().equals(c3)),
+ "At least one input split's latest commit time should be equal to
the specified instant time.");
// Stop the stream task.
function.close();