This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.12.2-shadow by this
push:
new d3d427e179b reverting complicated patch 7340 and fixed some build
failures
d3d427e179b is described below
commit d3d427e179b9ffb3dcf2cf764d401582f73352a8
Author: sivabalan <[email protected]>
AuthorDate: Tue Dec 13 16:52:03 2022 -0800
reverting complicated patch 7340 and fixed some build failures
---
.../apache/hudi/table/action/commit/FlinkMergeHelper.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 01466484d63..733ad76b975 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -18,7 +18,9 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -39,6 +41,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Iterator;
@@ -83,10 +86,15 @@ public class FlinkMergeHelper<T extends
HoodieRecordPayload> extends BaseMergeHe
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile,
mergeHandle.getOldFilePath());
+ HoodieFileReader<GenericRecord> bootstrapFileReader = null;
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
- readerIterator = getMergingIterator(table, mergeHandle, baseFile,
reader, readSchema, externalSchemaTransformation);
+ Path bootstrapFilePath = new
Path(baseFile.getBootstrapBaseFile().get().getPath());
+ Configuration bootstrapFileConfig = new
Configuration(table.getHadoopConf());
+ bootstrapFileReader =
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
+ readerIterator = new MergingIterator<>(reader.getRecordIterator(),
bootstrapFileReader.getRecordIterator(),
+ (inputRecordPair) ->
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(),
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
readerIterator = reader.getRecordIterator(readSchema);
}
@@ -109,6 +117,9 @@ public class FlinkMergeHelper<T extends
HoodieRecordPayload> extends BaseMergeHe
if (reader != null) {
reader.close();
}
+ if (bootstrapFileReader != null) {
+ bootstrapFileReader.close();
+ }
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();