This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.12.2-shadow by this 
push:
     new d3d427e179b reverting complicated patch 7340 and fixed some build 
failures
d3d427e179b is described below

commit d3d427e179b9ffb3dcf2cf764d401582f73352a8
Author: sivabalan <n.siv...@gmail.com>
AuthorDate: Tue Dec 13 16:52:03 2022 -0800

    reverting complicated patch 7340 and fixed some build failures
---
 .../apache/hudi/table/action/commit/FlinkMergeHelper.java   | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 01466484d63..733ad76b975 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.MergingIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -39,6 +41,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -83,10 +86,15 @@ public class FlinkMergeHelper<T extends 
HoodieRecordPayload> extends BaseMergeHe
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
     Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
     HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    HoodieFileReader<GenericRecord> bootstrapFileReader = null;
     try {
       final Iterator<GenericRecord> readerIterator;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+        Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+        Configuration bootstrapFileConfig = new 
Configuration(table.getHadoopConf());
+        bootstrapFileReader = 
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
+        readerIterator = new MergingIterator<>(reader.getRecordIterator(), 
bootstrapFileReader.getRecordIterator(),
+            (inputRecordPair) -> 
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), 
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
       } else {
         readerIterator = reader.getRecordIterator(readSchema);
       }
@@ -109,6 +117,9 @@ public class FlinkMergeHelper<T extends 
HoodieRecordPayload> extends BaseMergeHe
       if (reader != null) {
         reader.close();
       }
+      if (bootstrapFileReader != null) {
+        bootstrapFileReader.close();
+      }
       if (null != wrapper) {
         wrapper.shutdownNow();
         wrapper.awaitTermination();

Reply via email to