This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7462fde [HUDI-2112] Support reading pure logs file group for flink
batch reader after compaction (#3202)
7462fde is described below
commit 7462fdefc39c75dca986d65551d871b8c47d4f55
Author: Danny Chan <[email protected]>
AuthorDate: Fri Jul 2 16:29:22 2021 +0800
[HUDI-2112] Support reading pure logs file group for flink batch reader
after compaction (#3202)
---
.../org/apache/hudi/table/HoodieTableSource.java | 53 ++++++++--------------
.../apache/hudi/table/format/TestInputFormat.java | 12 ++++-
.../test/java/org/apache/hudi/utils/TestData.java | 11 +++++
3 files changed, 40 insertions(+), 36 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 786023e..c8f6e2a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -19,7 +19,7 @@
package org.apache.hudi.table;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
@@ -272,39 +271,24 @@ public class HoodieTableSource implements
HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants(), fileStatuses);
- List<HoodieBaseFile> latestFiles =
fsView.getLatestBaseFiles().collect(Collectors.toList());
String latestCommit = fsView.getLastInstant().get().getTimestamp();
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
final AtomicInteger cnt = new AtomicInteger(0);
- if (latestFiles.size() > 0) {
- Map<HoodieBaseFile, List<String>> fileGroup =
- HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(hadoopConf,
latestFiles);
- return fileGroup.entrySet().stream().map(kv -> {
- HoodieBaseFile baseFile = kv.getKey();
- Option<List<String>> logPaths = kv.getValue().size() == 0
- ? Option.empty()
- : Option.of(kv.getValue());
- return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- baseFile.getPath(), logPaths, latestCommit,
- metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType,
null);
- }).collect(Collectors.toList());
- } else {
- // all the files are logs
- return Arrays.stream(paths).map(partitionPath -> {
- String relPartitionPath = FSUtils.getRelativePartitionPath(path,
partitionPath);
- return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath,
latestCommit)
- .map(fileSlice -> {
- Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- .collect(Collectors.toList()));
- return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- null, logPaths, latestCommit,
- metaClient.getBasePath(), maxCompactionMemoryInBytes,
mergeType, null);
- }).collect(Collectors.toList()); })
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
- }
+ // generates one input split for each file group
+ return Arrays.stream(paths).map(partitionPath -> {
+ String relPartitionPath = FSUtils.getRelativePartitionPath(path,
partitionPath);
+ return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath,
latestCommit)
+ .map(fileSlice -> {
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ .collect(Collectors.toList()));
+ return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath,
logPaths, latestCommit,
+ metaClient.getBasePath(), maxCompactionMemoryInBytes,
mergeType, null);
+ }).collect(Collectors.toList()); })
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
public InputFormat<RowData, ?> getInputFormat() {
@@ -431,11 +415,12 @@ public class HoodieTableSource implements
}
/**
- * Reload the active timeline view.
+ * Reset the state of the table source.
*/
@VisibleForTesting
- public void reloadActiveTimeline() {
+ public void reset() {
this.metaClient.reloadActiveTimeline();
+ this.requiredPartitions = null;
}
/**
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 0ce4698..c8885b0 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -92,7 +92,7 @@ public class TestInputFormat {
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
// refresh the input format
- this.tableSource.reloadActiveTimeline();
+ this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat();
result = readData(inputFormat);
@@ -133,8 +133,12 @@ public class TestInputFormat {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+ // write another commit using logs with separate partition
+ // so the file group has only logs
+ TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+
// refresh the input format
- this.tableSource.reloadActiveTimeline();
+ this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat();
result = readData(inputFormat);
@@ -143,6 +147,10 @@ public class TestInputFormat {
expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+ "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
+ "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
+ + "id12,Monica,27,1970-01-01T00:00:00.009,par5, "
+ + "id13,Phoebe,31,1970-01-01T00:00:00.010,par5, "
+ + "id14,Rachel,52,1970-01-01T00:00:00.011,par6, "
+ + "id15,Ross,29,1970-01-01T00:00:00.012,par6, "
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+ "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
+ "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index fae0765..50ecf54 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -114,6 +114,17 @@ public class TestData {
TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
);
+ public static List<RowData> DATA_SET_INSERT_SEPARATE_PARTITION =
Arrays.asList(
+ insertRow(StringData.fromString("id12"),
StringData.fromString("Monica"), 27,
+ TimestampData.fromEpochMillis(9), StringData.fromString("par5")),
+ insertRow(StringData.fromString("id13"),
StringData.fromString("Phoebe"), 31,
+ TimestampData.fromEpochMillis(10), StringData.fromString("par5")),
+ insertRow(StringData.fromString("id14"),
StringData.fromString("Rachel"), 52,
+ TimestampData.fromEpochMillis(11), StringData.fromString("par6")),
+ insertRow(StringData.fromString("id15"), StringData.fromString("Ross"),
29,
+ TimestampData.fromEpochMillis(12), StringData.fromString("par6"))
+ );
+
public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
static {
IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(