This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 77cfb3a51da [HUDI-7147] Fix CDC write flush bug (#10186)
77cfb3a51da is described below

commit 77cfb3a51dabffdbfc67ba0ac85cfa129277c166
Author: YueZhang <[email protected]>
AuthorDate: Wed Nov 29 09:46:53 2023 +0800

    [HUDI-7147] Fix CDC write flush bug (#10186)
    
    * Using iterator instead of values to avoid unsupported operation exception
    
    * check style
---
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   | 23 +++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index cab978164d8..1e2fa7c59e4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -53,10 +53,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE;
 import static 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER;
@@ -84,7 +84,7 @@ public class HoodieCDCLogger implements Closeable {
   private final Schema cdcSchema;
 
   // the cdc data
-  private final Map<String, HoodieAvroPayload> cdcData;
+  private final ExternalSpillableMap<String, HoodieAvroPayload> cdcData;
 
   private final Map<HoodieLogBlock.HeaderMetadataType, String> 
cdcDataBlockHeader;
 
@@ -183,15 +183,16 @@ public class HoodieCDCLogger implements Closeable {
   private void flushIfNeeded(Boolean force) {
     if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= 
maxBlockSize) {
       try {
-        List<HoodieRecord> records = cdcData.values().stream()
-            .map(record -> {
-              try {
-                return new 
HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get());
-              } catch (IOException e) {
-                throw new HoodieIOException("Failed to get cdc record", e);
-              }
-            }).collect(Collectors.toList());
-
+        ArrayList<HoodieRecord> records = new ArrayList<>();
+        Iterator<HoodieAvroPayload> recordIter = cdcData.iterator();
+        while (recordIter.hasNext()) {
+          HoodieAvroPayload record = recordIter.next();
+          try {
+            records.add(new 
HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get()));
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get cdc record", e);
+          }
+        }
         HoodieLogBlock block = new HoodieCDCDataBlock(records, 
cdcDataBlockHeader, keyField);
         AppendResult result = 
cdcWriter.appendBlocks(Collections.singletonList(block));
 

Reply via email to