This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 29bc5fd [HUDI-2996] Flink streaming reader 'skip_compaction' option
does not work (#4304)
29bc5fd is described below
commit 29bc5fd912df9517acb9e15ead97f1849f21056b
Author: Fugle666 <[email protected]>
AuthorDate: Tue Dec 14 11:31:36 2021 +0800
[HUDI-2996] Flink streaming reader 'skip_compaction' option does not work
(#4304)
close apache/hudi#4304
---
.../java/org/apache/hudi/source/IncrementalInputSplits.java | 2 +-
.../java/org/apache/hudi/table/HoodieDataSourceITCase.java | 9 +++++++--
.../src/test/java/org/apache/hudi/utils/TestUtils.java | 13 +++++++++++++
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index fbb77c6..58c38ef 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -286,7 +286,7 @@ public class IncrementalInputSplits implements Serializable
{
HoodieTimeline completedTimeline =
commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
// returns early for streaming mode
- return completedTimeline.getInstants()
+ return maySkipCompaction(completedTimeline.getInstants())
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 2844261..9eef2fe 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -245,7 +245,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
@Test
void testStreamWriteReadSkippingCompaction() throws Exception {
// create filesystem table named source
- String createSource = TestConfigurations.getFileSourceDDL("source");
+ String createSource = TestConfigurations.getFileSourceDDL("source", 4);
streamTableEnv.executeSql(createSource);
String hoodieTableDDL = sql("t1")
@@ -260,7 +260,12 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
- List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ String instant =
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);
+
+ streamTableEnv.getConfig().getConfiguration()
+ .setBoolean("table.dynamic-table-options.enabled", true);
+ final String query = String.format("select * from t1/*+
options('read.start-commit'='%s')*/", instant);
+ List<Row> rows = execSelectSql(streamTableEnv, query, 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 92e16cd..466ccdf 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -29,6 +29,8 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import javax.annotation.Nullable;
+
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -64,6 +66,17 @@ public class TestUtils {
.map(HoodieInstant::getTimestamp).orElse(null);
}
+ @Nullable
+ public static String getNthCompleteInstant(String basePath, int n, boolean
isDelta) {
+ final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+ return metaClient.getActiveTimeline()
+ .filterCompletedInstants()
+ .filter(instant -> isDelta ?
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) :
HoodieTimeline.COMMIT_ACTION.equals(instant.getAction()))
+ .nthInstant(n).map(HoodieInstant::getTimestamp)
+ .orElse(null);
+ }
+
public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
assertTrue(split.getLogPaths().isPresent());
final String logPath = split.getLogPaths().get().get(0);