This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7462fde  [HUDI-2112] Support reading pure logs file group for flink 
batch reader after compaction (#3202)
7462fde is described below

commit 7462fdefc39c75dca986d65551d871b8c47d4f55
Author: Danny Chan <[email protected]>
AuthorDate: Fri Jul 2 16:29:22 2021 +0800

    [HUDI-2112] Support reading pure logs file group for flink batch reader 
after compaction (#3202)
---
 .../org/apache/hudi/table/HoodieTableSource.java   | 53 ++++++++--------------
 .../apache/hudi/table/format/TestInputFormat.java  | 12 ++++-
 .../test/java/org/apache/hudi/utils/TestData.java  | 11 +++++
 3 files changed, 40 insertions(+), 36 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 786023e..c8f6e2a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieROTablePathFilter;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.source.StreamReadOperator;
 import org.apache.hudi.table.format.FilePathUtils;
@@ -272,39 +271,24 @@ public class HoodieTableSource implements
     HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
         metaClient.getActiveTimeline().getCommitsTimeline()
             .filterCompletedInstants(), fileStatuses);
-    List<HoodieBaseFile> latestFiles = 
fsView.getLatestBaseFiles().collect(Collectors.toList());
     String latestCommit = fsView.getLastInstant().get().getTimestamp();
     final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
     final AtomicInteger cnt = new AtomicInteger(0);
-    if (latestFiles.size() > 0) {
-      Map<HoodieBaseFile, List<String>> fileGroup =
-          HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(hadoopConf, 
latestFiles);
-      return fileGroup.entrySet().stream().map(kv -> {
-        HoodieBaseFile baseFile = kv.getKey();
-        Option<List<String>> logPaths = kv.getValue().size() == 0
-            ? Option.empty()
-            : Option.of(kv.getValue());
-        return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-            baseFile.getPath(), logPaths, latestCommit,
-            metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, 
null);
-      }).collect(Collectors.toList());
-    } else {
-      // all the files are logs
-      return Arrays.stream(paths).map(partitionPath -> {
-        String relPartitionPath = FSUtils.getRelativePartitionPath(path, 
partitionPath);
-        return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, 
latestCommit)
-            .map(fileSlice -> {
-              Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
-                  .sorted(HoodieLogFile.getLogFileComparator())
-                  .map(logFile -> logFile.getPath().toString())
-                  .collect(Collectors.toList()));
-              return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-                  null, logPaths, latestCommit,
-                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, null);
-            }).collect(Collectors.toList()); })
-          .flatMap(Collection::stream)
-          .collect(Collectors.toList());
-    }
+    // generates one input split for each file group
+    return Arrays.stream(paths).map(partitionPath -> {
+      String relPartitionPath = FSUtils.getRelativePartitionPath(path, 
partitionPath);
+      return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, 
latestCommit)
+          .map(fileSlice -> {
+            String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+            Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
+                .sorted(HoodieLogFile.getLogFileComparator())
+                .map(logFile -> logFile.getPath().toString())
+                .collect(Collectors.toList()));
+            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, 
logPaths, latestCommit,
+                metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, null);
+          }).collect(Collectors.toList()); })
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
   }
 
   public InputFormat<RowData, ?> getInputFormat() {
@@ -431,11 +415,12 @@ public class HoodieTableSource implements
   }
 
   /**
-   * Reload the active timeline view.
+   * Reset the state of the table source.
    */
   @VisibleForTesting
-  public void reloadActiveTimeline() {
+  public void reset() {
     this.metaClient.reloadActiveTimeline();
+    this.requiredPartitions = null;
   }
 
   /**
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 0ce4698..c8885b0 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -92,7 +92,7 @@ public class TestInputFormat {
     TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
 
     // refresh the input format
-    this.tableSource.reloadActiveTimeline();
+    this.tableSource.reset();
     inputFormat = this.tableSource.getInputFormat();
 
     result = readData(inputFormat);
@@ -133,8 +133,12 @@ public class TestInputFormat {
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
     TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
 
+    // write another commit using logs with separate partition
+    // so the file group has only logs
+    TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+
     // refresh the input format
-    this.tableSource.reloadActiveTimeline();
+    this.tableSource.reset();
     inputFormat = this.tableSource.getInputFormat();
 
     result = readData(inputFormat);
@@ -143,6 +147,10 @@ public class TestInputFormat {
     expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
         + "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
         + "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
+        + "id12,Monica,27,1970-01-01T00:00:00.009,par5, "
+        + "id13,Phoebe,31,1970-01-01T00:00:00.010,par5, "
+        + "id14,Rachel,52,1970-01-01T00:00:00.011,par6, "
+        + "id15,Ross,29,1970-01-01T00:00:00.012,par6, "
         + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
         + "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
         + "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index fae0765..50ecf54 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -114,6 +114,17 @@ public class TestData {
           TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
   );
 
+  public static List<RowData> DATA_SET_INSERT_SEPARATE_PARTITION = 
Arrays.asList(
+      insertRow(StringData.fromString("id12"), 
StringData.fromString("Monica"), 27,
+          TimestampData.fromEpochMillis(9), StringData.fromString("par5")),
+      insertRow(StringData.fromString("id13"), 
StringData.fromString("Phoebe"), 31,
+          TimestampData.fromEpochMillis(10), StringData.fromString("par5")),
+      insertRow(StringData.fromString("id14"), 
StringData.fromString("Rachel"), 52,
+          TimestampData.fromEpochMillis(11), StringData.fromString("par6")),
+      insertRow(StringData.fromString("id15"), StringData.fromString("Ross"), 
29,
+          TimestampData.fromEpochMillis(12), StringData.fromString("par6"))
+  );
+
   public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
   static {
     IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(

Reply via email to