This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 125415a  [HUDI-1994] Release the new records iterator for append 
handle #close (#3058)
125415a is described below

commit 125415a8b89bb9190d83b3ad74e68f1d86a05118
Author: Danny Chan <[email protected]>
AuthorDate: Thu Jun 10 19:09:23 2021 +0800

    [HUDI-1994] Release the new records iterator for append handle #close 
(#3058)
---
 .../src/main/java/org/apache/hudi/io/HoodieAppendHandle.java            | 1 +
 hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java  | 2 ++
 2 files changed, 3 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 616fca7..8ee4b46 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -386,6 +386,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends
     try {
       // flush any remaining records to disk
       appendDataAndDeleteBlocks(header);
+      recordItr = null;
       if (writer != null) {
         writer.close();
 
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index c49f7f1..7b16538 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -532,6 +532,7 @@ public class StreamWriteFunction<K, I, O>
     }
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, instant));
+    records.clear();
     final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
         .taskID(taskID)
         .instantTime(instant) // the write instant may shift but the event 
still use the currentInstant.
@@ -564,6 +565,7 @@ public class StreamWriteFunction<K, I, O>
               }
               bucket.preWrite(records);
               writeStatus.addAll(writeFunction.apply(records, currentInstant));
+              records.clear();
               bucket.reset();
             }
           });

Reply via email to