This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3ec9270 [HUDI-1490] Incremental Query should work even when there are
partitions that have no incremental changes (#2371)
3ec9270 is described below
commit 3ec9270e8e0be4c94106bdb09543971296ebe2d2
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Sat Dec 26 09:17:49 2020 -0800
[HUDI-1490] Incremental Query should work even when there are partitions
that have no incremental changes (#2371)
* Incremental Query should work even when there are partitions that have
no incremental changes
Co-authored-by: Sivabalan Narayanan <[email protected]>
---
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 2 +-
.../hudi/hadoop/TestHoodieParquetInputFormat.java | 30 ++++++++++++++++++++--
2 files changed, 29 insertions(+), 3 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index b368851..9b0fbf9 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -236,7 +236,7 @@ public class HoodieInputFormatUtils {
return false;
})
.collect(Collectors.joining(","));
- return Option.of(incrementalInputPaths);
+ return StringUtils.isNullOrEmpty(incrementalInputPaths) ? Option.empty() :
Option.of(incrementalInputPaths);
}
/**
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 2783889..59214ac 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -117,7 +117,6 @@ public class TestHoodieParquetInputFormat {
assertFalse(filteredTimeline.containsInstant(t5));
assertFalse(filteredTimeline.containsInstant(t6));
-
// remove compaction instant and setup timeline again
instants.remove(t3);
timeline = new HoodieActiveTimeline(metaClient);
@@ -239,6 +238,33 @@ public class TestHoodieParquetInputFormat {
"We should exclude commit 100 when returning incremental pull with
start commit time as 100");
}
+ @Test
+ public void testIncrementalEmptyPartitions() throws IOException {
+ // initial commit
+ File partitionDir = InputFormatTestUtil.prepareTable(basePath,
baseFileFormat, 10, "100");
+ createCommitFile(basePath, "100", "2016/05/01");
+
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+ InputFormatTestUtil.setupIncremental(jobConf, "000", 1);
+
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+ assertEquals(10, files.length,
+ "We should include only 1 commit 100 when returning incremental pull
with start commit time as 100");
+ ensureFilesInCommit("Pulling 1 commits from 000, should get us the 10
files from 100 commit", files, "100", 10);
+
+ // Add new commit only to a new partition
+ partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat,
10, "200");
+ createCommitFile(basePath, "200", "2017/05/01");
+
+ InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
+ files = inputFormat.listStatus(jobConf);
+
+ assertEquals(0, files.length,
+ "We should exclude commit 200 when returning incremental pull with
start commit time as 100 as filePaths does not include new partition");
+ }
+
private void createCommitFile(java.nio.file.Path basePath, String
commitNumber, String partitionPath)
throws IOException {
List<HoodieWriteStat> writeStats =
HoodieTestUtils.generateFakeHoodieWriteStat(1);
@@ -355,7 +381,7 @@ public class TestHoodieParquetInputFormat {
String incrementalMode1 =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN,
expectedincrTables[0]);
conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String incrementalMode2 =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN,
expectedincrTables[1]);
- conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+ conf.set(incrementalMode2, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String incrementalMode3 =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
conf.set(incrementalMode3,
HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
String defaultmode =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");