This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ca2bbcfd1 [HUDI-6992] IncrementalInputSplits incorrectly set the 
latestCommit attribute (#9923)
44ca2bbcfd1 is described below

commit 44ca2bbcfd1512a55155e1033a9c9aca132efae6
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Thu Nov 9 15:04:50 2023 +0800

    [HUDI-6992] IncrementalInputSplits incorrectly set the latestCommit 
attribute (#9923)
---
 .../org/apache/hudi/common/model/FileSlice.java    | 13 ++++--
 .../table/timeline/CompletionTimeQueryView.java    |  7 +--
 .../hudi/common/table/timeline/HoodieTimeline.java | 14 ++++++
 .../apache/hudi/common/model/TestFileSlice.java    | 50 ++++++++++++++++++++++
 .../apache/hudi/source/IncrementalInputSplits.java |  7 ++-
 .../hudi/source/TestIncrementalInputSplits.java    | 47 ++++++++++++++++++++
 .../source/TestStreamReadMonitoringFunction.java   | 12 +++---
 7 files changed, 134 insertions(+), 16 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
index 3f0fcf94156..d071385ea75 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 
 import java.io.Serializable;
@@ -123,9 +124,15 @@ public class FileSlice implements Serializable {
   }
 
   /**
-   * Returns true if there is no data file and no log files. Happens as part 
of pending compaction
-   * 
-   * @return
+   * Returns the latest instant time of the file slice.
+   */
+  public String getLatestInstantTime() {
+    Option<String> latestDeltaCommitTime = 
getLatestLogFile().map(HoodieLogFile::getDeltaCommitTime);
+    return latestDeltaCommitTime.isPresent() ? 
HoodieTimeline.maxInstant(latestDeltaCommitTime.get(), getBaseInstantTime()) : 
getBaseInstantTime();
+  }
+
+  /**
+   * Returns true if there is no data file and no log files. Happens as part 
of pending compaction.
    */
   public boolean isEmpty() {
     return (baseFile == null) && (logFiles.isEmpty());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 081cae8cb15..e53f185bffd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import static 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
  * Query view for instant completion time.
@@ -81,7 +80,7 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
   public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String 
cursorInstant) {
     this.metaClient = metaClient;
     this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
-    this.cursorInstant = minInstant(cursorInstant, 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+    this.cursorInstant = HoodieTimeline.minInstant(cursorInstant, 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
     // Note: use getWriteTimeline() to keep sync with the fs view 
visibleCommitsAndCompactionTimeline, see 
AbstractTableFileSystemView.refreshTimeline.
     this.firstNonSavepointCommit = 
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
     load();
@@ -207,10 +206,6 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
     this.startToCompletionInstantTimeMap.putIfAbsent(instantTime, 
completionTime);
   }
 
-  private static String minInstant(String instant1, String instant2) {
-    return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 : 
instant2;
-  }
-
   public String getCursorInstant() {
     return cursorInstant;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 82ec439bd25..53c7d25a00c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -415,6 +415,20 @@ public interface HoodieTimeline extends Serializable {
     return predicateToApply.test(commit1, commit2);
   }
 
+  /**
+   * Returns the smaller of the given two instants.
+   */
+  static String minInstant(String instant1, String instant2) {
+    return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 : 
instant2;
+  }
+
+  /**
+   * Returns the greater of the given two instants.
+   */
+  static String maxInstant(String instant1, String instant2) {
+    return compareTimestamps(instant1, GREATER_THAN, instant2) ? instant1 : 
instant2;
+  }
+
   /**
    * Return true if specified timestamp is in range (startTs, endTs].
    */
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
new file mode 100644
index 00000000000..376696bdfa9
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests {@link FileSlice}.
+ */
+public class TestFileSlice {
+  @Test
+  void testGetLatestInstantTime() {
+    String baseInstant = "003";
+    String deltaInstant2 = "002";
+    String deltaInstant4 = "004";
+
+    FileSlice fileSlice = new FileSlice("par1", baseInstant, "fg1");
+    assertThat(fileSlice.getLatestInstantTime(), is(baseInstant));
+
+    fileSlice.addLogFile(new HoodieLogFile(new 
Path(getLogFileName(deltaInstant2))));
+    assertThat(fileSlice.getLatestInstantTime(), is(baseInstant));
+
+    fileSlice.addLogFile(new HoodieLogFile(new 
Path(getLogFileName(deltaInstant4))));
+    assertThat(fileSlice.getLatestInstantTime(), is(deltaInstant4));
+  }
+
+  private static String getLogFileName(String instantTime) {
+    return ".fg1_" + instantTime + ".log.1_1-0-1";
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index d3391141834..2cc96cb8ae6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -57,6 +57,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -453,11 +454,15 @@ public class IncrementalInputSplits implements 
Serializable {
                   .filter(logPath -> 
!logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
                   .collect(Collectors.toList()));
               String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+              // the latest commit is used as the limit of the log reader 
instant upper threshold,
+              // it must be at least the latest instant time of the file slice 
to avoid data loss.
+              String latestCommit = 
HoodieTimeline.minInstant(fileSlice.getLatestInstantTime(), endInstant);
               return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-                  basePath, logPaths, endInstant,
+                  basePath, logPaths, latestCommit,
                   metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, instantRange, fileSlice.getFileId());
             }).collect(Collectors.toList()))
         .flatMap(Collection::stream)
+        .sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit))
         .collect(Collectors.toList());
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 2dff6416a29..92766186065 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -18,16 +18,19 @@
 
 package org.apache.hudi.source;
 
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -37,6 +40,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.hadoop.fs.FileStatus;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -49,6 +53,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -58,6 +63,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for {@link IncrementalInputSplits}.
@@ -229,6 +235,47 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
     assertEquals(1, intervalBetween2Instants(commitsTimeline, minStartCommit, 
maxEndCommit));
   }
 
+  @Test
+  void testInputSplitsForSplitLastCommit() throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+    conf.set(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+
+    // insert data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    HoodieTimeline commitsTimeline = 
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    List<HoodieInstant> instants = commitsTimeline.getInstants();
+    String lastInstant  = 
commitsTimeline.lastInstant().map(HoodieInstant::getTimestamp).get();
+    List<HoodieCommitMetadata> metadataList = instants.stream()
+            .map(instant -> WriteProfiles.getCommitMetadata(tableName, new 
Path(basePath), instant, commitsTimeline)).collect(Collectors.toList());
+    FileStatus[] fileStatuses = WriteProfiles.getFilesFromMetadata(new 
Path(basePath), metaClient.getHadoopConf(), metadataList, 
metaClient.getTableType());
+    HoodieTableFileSystemView fileSystemView =
+            new HoodieTableFileSystemView(metaClient, commitsTimeline, 
fileStatuses);
+    Map<String, String> fileIdToBaseInstant = 
fileSystemView.getAllFileSlices("par1")
+            .collect(Collectors.toMap(FileSlice::getFileId, 
FileSlice::getBaseInstantTime));
+
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+            .conf(conf)
+            .path(new Path(basePath))
+            .rowType(TestConfigurations.ROW_TYPE)
+            .partitionPruner(null)
+            .build();
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
null, false);
+    result.getInputSplits().stream().filter(split -> 
fileIdToBaseInstant.containsKey(split.getFileId()))
+            .forEach(split -> 
assertEquals(fileIdToBaseInstant.get(split.getFileId()), 
split.getLatestCommit()));
+    assertTrue(result.getInputSplits().stream().anyMatch(split -> 
split.getLatestCommit().equals(lastInstant)),
+            "Some input splits' latest commit time should equal to the last 
instant");
+    assertTrue(result.getInputSplits().stream().anyMatch(split -> 
!split.getLatestCommit().equals(lastInstant)),
+            "The input split latest commit time does not always equal to last 
instant");
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 81bb6f03477..6edeceae0d8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -253,8 +253,8 @@ public class TestStreamReadMonitoringFunction {
           sourceContext.getPartitionPaths(), is("par1"));
       assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
           "No instants should have range limit");
-      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(c4)),
-          "All the splits should be with specified instant time");
+      assertTrue(sourceContext.splits.stream().anyMatch(split -> 
split.getLatestCommit().equals(c4)),
+          "At least one input split's latest commit time should be equal to 
the specified instant time.");
 
       // reset the source context
       latch = new CountDownLatch(1);
@@ -270,8 +270,8 @@ public class TestStreamReadMonitoringFunction {
           "All instants should have range limit");
       assertTrue(sourceContext.splits.stream().allMatch(split -> 
isPointInstantRange(split.getInstantRange().get(), c2)),
           "All the splits should have point instant range");
-      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(c2)),
-          "All the splits should be with specified instant time");
+      assertTrue(sourceContext.splits.stream().anyMatch(split -> 
split.getLatestCommit().equals(c2)),
+          "At least one input split's latest commit time should be equal to 
the specified instant time.");
 
       // reset the source context
       latch = new CountDownLatch(1);
@@ -287,8 +287,8 @@ public class TestStreamReadMonitoringFunction {
           "All instants should have range limit");
       assertTrue(sourceContext.splits.stream().allMatch(split -> 
isPointInstantRange(split.getInstantRange().get(), c3)),
           "All the splits should have point instant range");
-      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(c3)),
-          "All the splits should be with specified instant time");
+      assertTrue(sourceContext.splits.stream().anyMatch(split -> 
split.getLatestCommit().equals(c3)),
+          "At least one input split's latest commit time should be equal to 
the specified instant time.");
 
       // Stop the stream task.
       function.close();

Reply via email to