This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 29bc5fd  [HUDI-2996] Flink streaming reader 'skip_compaction' option 
does not work (#4304)
29bc5fd is described below

commit 29bc5fd912df9517acb9e15ead97f1849f21056b
Author: Fugle666 <[email protected]>
AuthorDate: Tue Dec 14 11:31:36 2021 +0800

    [HUDI-2996] Flink streaming reader 'skip_compaction' option does not work 
(#4304)
    
    close apache/hudi#4304
---
 .../java/org/apache/hudi/source/IncrementalInputSplits.java |  2 +-
 .../java/org/apache/hudi/table/HoodieDataSourceITCase.java  |  9 +++++++--
 .../src/test/java/org/apache/hudi/utils/TestUtils.java      | 13 +++++++++++++
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index fbb77c6..58c38ef 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -286,7 +286,7 @@ public class IncrementalInputSplits implements Serializable 
{
     HoodieTimeline completedTimeline = 
commitTimeline.filterCompletedInstants();
     if (issuedInstant != null) {
       // returns early for streaming mode
-      return completedTimeline.getInstants()
+      return maySkipCompaction(completedTimeline.getInstants())
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
           .collect(Collectors.toList());
     }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 2844261..9eef2fe 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -245,7 +245,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
   @Test
   void testStreamWriteReadSkippingCompaction() throws Exception {
     // create filesystem table named source
-    String createSource = TestConfigurations.getFileSourceDDL("source");
+    String createSource = TestConfigurations.getFileSourceDDL("source", 4);
     streamTableEnv.executeSql(createSource);
 
     String hoodieTableDDL = sql("t1")
@@ -260,7 +260,12 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     String insertInto = "insert into t1 select * from source";
     execInsertSql(streamTableEnv, insertInto);
 
-    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    String instant = 
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);
+
+    streamTableEnv.getConfig().getConfiguration()
+        .setBoolean("table.dynamic-table-options.enabled", true);
+    final String query = String.format("select * from t1/*+ 
options('read.start-commit'='%s')*/", instant);
+    List<Row> rows = execSelectSql(streamTableEnv, query, 10);
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
   }
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 92e16cd..466ccdf 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -29,6 +29,8 @@ import org.apache.hudi.util.StreamerUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 
+import javax.annotation.Nullable;
+
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -64,6 +66,17 @@ public class TestUtils {
         .map(HoodieInstant::getTimestamp).orElse(null);
   }
 
+  @Nullable
+  public static String getNthCompleteInstant(String basePath, int n, boolean 
isDelta) {
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    return metaClient.getActiveTimeline()
+        .filterCompletedInstants()
+        .filter(instant -> isDelta ? 
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) : 
HoodieTimeline.COMMIT_ACTION.equals(instant.getAction()))
+        .nthInstant(n).map(HoodieInstant::getTimestamp)
+        .orElse(null);
+  }
+
   public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
     assertTrue(split.getLogPaths().isPresent());
     final String logPath = split.getLogPaths().get().get(0);

Reply via email to