This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 528f4ca [HUDI-1880] Support streaming read with compaction and
cleaning (#2921)
528f4ca is described below
commit 528f4ca988209cbc9e519bd42b87b896aea992b6
Author: Danny Chan <[email protected]>
AuthorDate: Fri May 7 20:04:35 2021 +0800
[HUDI-1880] Support streaming read with compaction and cleaning (#2921)
---
.../java/org/apache/hudi/io/FlinkMergeHandle.java | 14 ++++-----
.../org/apache/hudi/sink/StreamWriteFunction.java | 9 ++----
.../hudi/source/StreamReadMonitoringFunction.java | 33 ++++++++++++----------
.../table/format/mor/MergeOnReadInputFormat.java | 11 ++++++++
.../sink/utils/StreamWriteFunctionWrapper.java | 1 -
5 files changed, 36 insertions(+), 32 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 4da6404..7fffd6b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -87,8 +87,9 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload,
I, K, O>
* Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
*/
protected String generatesDataFileNameWithRollover() {
- final String fileID = this.fileId + "-" + rollNumber;
- return FSUtils.makeDataFileName(instantTime, writeToken, fileID,
hoodieTable.getBaseFileExtension());
+ // make the intermediate file as hidden
+ return FSUtils.makeDataFileName("." + instantTime,
+ writeToken + "-" + rollNumber, this.fileId,
hoodieTable.getBaseFileExtension());
}
public boolean shouldRollover() {
@@ -193,13 +194,8 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
throw new HoodieIOException("Error when clean the temporary roll file:
" + path, e);
}
}
- Path lastPath = rolloverPaths.size() > 0
- ? rolloverPaths.get(rolloverPaths.size() - 1)
- : newFilePath;
- String newFileName = generatesDataFileName();
- String relativePath = new Path((partitionPath.isEmpty() ? "" :
partitionPath + "/")
- + newFileName).toString();
- final Path desiredPath = new Path(config.getBasePath(), relativePath);
+ final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1);
+ final Path desiredPath = rolloverPaths.get(0);
try {
fs.rename(lastPath, desiredPath);
} catch (IOException e) {
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 8244226..5c3cbb7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -33,7 +33,6 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -92,7 +91,7 @@ import java.util.function.BiFunction;
*/
public class StreamWriteFunction<K, I, O>
extends KeyedProcessFunction<K, I, O>
- implements CheckpointedFunction, CheckpointListener {
+ implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
@@ -181,11 +180,6 @@ public class StreamWriteFunction<K, I, O>
}
}
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- this.writeClient.cleanHandles();
- }
-
/**
* End input action for batch source.
*/
@@ -390,6 +384,7 @@ public class StreamWriteFunction<K, I, O>
.build();
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
+ this.writeClient.cleanHandles();
this.currentInstant = "";
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index f508726..b6eb397 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -112,8 +112,6 @@ public class StreamReadMonitoringFunction
private final long maxCompactionMemoryInBytes;
- private final boolean isDelta;
-
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
@@ -124,7 +122,6 @@ public class StreamReadMonitoringFunction
this.metaClient = metaClient;
this.interval =
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.isDelta =
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
}
@Override
@@ -189,15 +186,12 @@ public class StreamReadMonitoringFunction
@VisibleForTesting
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit>
context) {
metaClient.reloadActiveTimeline();
- HoodieTimeline commitTimeline = isDelta
- // if is delta, exclude the parquet files from compaction
- ?
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
- :
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ HoodieTimeline commitTimeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return;
}
- List<HoodieInstant> instants = getUncompactedInstants(commitTimeline,
this.issuedInstant);
+ List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline,
this.issuedInstant);
// get the latest instant that satisfies condition
final HoodieInstant instantToIssue = instants.size() == 0 ? null :
instants.get(instants.size() - 1);
final InstantRange instantRange;
@@ -303,29 +297,26 @@ public class StreamReadMonitoringFunction
}
/**
- * Returns the uncompacted instants with a given issuedInstant to start from.
+ * Returns the instants with a given issuedInstant to start from.
*
* @param commitTimeline The completed commits timeline
* @param issuedInstant The last issued instant that has already been
delivered to downstream
* @return the filtered hoodie instants
*/
- private List<HoodieInstant> getUncompactedInstants(
+ private List<HoodieInstant> filterInstantsWithStart(
HoodieTimeline commitTimeline,
final String issuedInstant) {
if (issuedInstant != null) {
return commitTimeline.getInstants()
- .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
} else if
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
String definedStartCommit =
this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
return commitTimeline.getInstants()
- .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, definedStartCommit))
.collect(Collectors.toList());
} else {
return commitTimeline.getInstants()
- .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.collect(Collectors.toList());
}
}
@@ -357,14 +348,26 @@ public class StreamReadMonitoringFunction
private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata
metadata, FileSystem fs) {
return metadata.getFileIdAndFullPaths(path.toString()).values().stream()
- .map(path -> {
+ .map(org.apache.hadoop.fs.Path::new)
+ // filter out the file paths that does not exist, some files may be
cleaned by
+ // the cleaner.
+ .filter(path -> {
+ try {
+ return fs.exists(path);
+ } catch (IOException e) {
+ LOG.error("Checking exists of path: {} error", path);
+ throw new HoodieException(e);
+ }
+ }).map(path -> {
try {
- return fs.getFileStatus(new org.apache.hadoop.fs.Path(path));
+ return fs.getFileStatus(path);
} catch (IOException e) {
LOG.error("Get write status of path: {} error", path);
throw new HoodieException(e);
}
})
+ // filter out crushed files
+ .filter(fileStatus -> fileStatus.getLen() > 0)
.collect(Collectors.toList());
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 595bf5a..f0f4f41 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -536,6 +536,9 @@ public class MergeOnReadInputFormat
private final GenericRecordBuilder recordBuilder;
private final RowDataProjection projection;
+
+ private final InstantRange instantRange;
+
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
// method #reachedEnd() returns false after it returns true.
// refactor it out once FLINK-22370 is resolved.
@@ -564,12 +567,20 @@ public class MergeOnReadInputFormat
this.rowDataToAvroConverter =
RowDataToAvroConverters.createConverter(tableRowType);
this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(requiredRowType);
this.projection = RowDataProjection.instance(requiredRowType,
requiredPos);
+ this.instantRange = split.getInstantRange().orElse(null);
}
@Override
public boolean reachedEnd() throws IOException {
if (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord();
+ if (instantRange != null) {
+ boolean isInRange =
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+ if (!isInRange) {
+ // filter base file by instant range
+ return reachedEnd();
+ }
+ }
final String curKey =
currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
if (logRecords.containsKey(curKey)) {
keyToSkip.add(curKey);
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index a4b6c16..5050109 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -163,7 +163,6 @@ public class StreamWriteFunctionWrapper<I> {
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
coordinator.notifyCheckpointComplete(checkpointId);
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
- this.writeFunction.notifyCheckpointComplete(checkpointId);
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
try {
compactFunctionWrapper.compact(checkpointId);