This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 125415a [HUDI-1994] Release the new records iterator for append
handle #close (#3058)
125415a is described below
commit 125415a8b89bb9190d83b3ad74e68f1d86a05118
Author: Danny Chan <[email protected]>
AuthorDate: Thu Jun 10 19:09:23 2021 +0800
[HUDI-1994] Release the new records iterator for append handle #close
(#3058)
---
.../src/main/java/org/apache/hudi/io/HoodieAppendHandle.java | 1 +
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java | 2 ++
2 files changed, 3 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 616fca7..8ee4b46 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -386,6 +386,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends
try {
// flush any remaining records to disk
appendDataAndDeleteBlocks(header);
+ recordItr = null;
if (writer != null) {
writer.close();
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index c49f7f1..7b16538 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -532,6 +532,7 @@ public class StreamWriteFunction<K, I, O>
}
bucket.preWrite(records);
final List<WriteStatus> writeStatus = new
ArrayList<>(writeFunction.apply(records, instant));
+ records.clear();
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event
still use the currentInstant.
@@ -564,6 +565,7 @@ public class StreamWriteFunction<K, I, O>
}
bucket.preWrite(records);
writeStatus.addAll(writeFunction.apply(records, currentInstant));
+ records.clear();
bucket.reset();
}
});