nsivabalan commented on code in PR #13580:
URL: https://github.com/apache/hudi/pull/13580#discussion_r2219913626


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -82,6 +91,7 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O> 
extends HoodieMergeHand
   private HoodieReadStats readStats;
   private final HoodieRecord.HoodieRecordType recordType;
   private final Option<HoodieCDCLogger> cdcLogger;
+  private final Iterator<HoodieRecord<T>> recordIterator;

Review Comment:
   can we make this optional instead of dealing w/ nulls



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -108,6 +118,42 @@ public FileGroupReaderBasedMergeHandle(HoodieWriteConfig 
config, String instantT
     } else {
       this.cdcLogger = Option.empty();
     }
+    this.recordIterator = null;
+    init(operation, this.partitionPath, fileSlice.getBaseFile());
+  }
+
+  /**
+   * FG reader based generic merge handle, which is not just for compaction.
+   */
+  public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                         Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
+                                         TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt,
+                                         HoodieReaderContext<T> readerContext, 
String maxInstantTime,
+                                         HoodieRecord.HoodieRecordType 
enginRecordType) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
+    this.maxInstantTime = maxInstantTime;
+    this.keyToNewRecords = Collections.emptyMap();
+    this.readerContext = readerContext;
+    this.fileSlice = null;
+    this.recordIterator = recordItr;
+    this.operation = null;
+    if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+      this.cdcLogger = Option.of(new HoodieCDCLogger(
+          instantTime,
+          config,
+          hoodieTable.getMetaClient().getTableConfig(),
+          partitionPath,
+          storage,
+          getWriterSchema(),
+          createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX, 
Option.empty()),
+          IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)));
+    } else {
+      this.cdcLogger = Option.empty();
+    }
+    // If the table is a metadata table or the base file is an HFile, we use 
AVRO record type, otherwise we use the engine record type.
+    this.recordType = (hoodieTable.isMetadataTable()
+        || HFILE.getFileExtension().equals(hoodieTable.getBaseFileExtension()))
+        ? HoodieRecord.HoodieRecordType.AVRO : enginRecordType;

Review Comment:
   we are setting `fileslice` to null in L 137. So, we should be hitting 
NullPointerExeption at L 157 when we call init () where last arg is 
`fileSlice.getBaseFile()`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/InputBasedFileGroupRecordBuffer.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+public class InputBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupRecordBuffer<T> {
+  private final Iterator<T> inputRecordIterator;
+
+  public InputBasedFileGroupRecordBuffer(HoodieReaderContext readerContext,

Review Comment:
   RecordIteratorBasedFileGroupRecordBuffer



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -108,6 +118,42 @@ public FileGroupReaderBasedMergeHandle(HoodieWriteConfig 
config, String instantT
     } else {
       this.cdcLogger = Option.empty();
     }
+    this.recordIterator = null;
+    init(operation, this.partitionPath, fileSlice.getBaseFile());
+  }
+
+  /**
+   * FG reader based generic merge handle, which is not just for compaction.
+   */
+  public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                         Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
+                                         TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt,
+                                         HoodieReaderContext<T> readerContext, 
String maxInstantTime,
+                                         HoodieRecord.HoodieRecordType 
enginRecordType) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
+    this.maxInstantTime = maxInstantTime;

Review Comment:
   +1 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -108,6 +118,42 @@ public FileGroupReaderBasedMergeHandle(HoodieWriteConfig 
config, String instantT
     } else {
       this.cdcLogger = Option.empty();
     }
+    this.recordIterator = null;
+    init(operation, this.partitionPath, fileSlice.getBaseFile());
+  }
+
+  /**
+   * FG reader based generic merge handle, which is not just for compaction.
+   */
+  public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                         Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
+                                         TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt,
+                                         HoodieReaderContext<T> readerContext, 
String maxInstantTime,
+                                         HoodieRecord.HoodieRecordType 
enginRecordType) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
+    this.maxInstantTime = maxInstantTime;
+    this.keyToNewRecords = Collections.emptyMap();
+    this.readerContext = readerContext;
+    this.fileSlice = null;
+    this.recordIterator = recordItr;
+    this.operation = null;
+    if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+      this.cdcLogger = Option.of(new HoodieCDCLogger(
+          instantTime,
+          config,
+          hoodieTable.getMetaClient().getTableConfig(),
+          partitionPath,
+          storage,
+          getWriterSchema(),
+          createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX, 
Option.empty()),
+          IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)));
+    } else {
+      this.cdcLogger = Option.empty();
+    }
+    // If the table is a metadata table or the base file is an HFile, we use 
AVRO record type, otherwise we use the engine record type.
+    this.recordType = (hoodieTable.isMetadataTable()
+        || HFILE.getFileExtension().equals(hoodieTable.getBaseFileExtension()))
+        ? HoodieRecord.HoodieRecordType.AVRO : enginRecordType;

Review Comment:
   I checked the constructor of this class in tandem w/ HooieMergeHandle. 
   we should probably fix L157 last arg to 
   ```
   Option.of(baseFileToMerge))
   ```
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -116,16 +117,69 @@ public HoodieFileGroupReader(HoodieReaderContext<T> 
readerContext, HoodieStorage
       long start, long length, boolean shouldUseRecordPosition) {
     this(readerContext, storage, tablePath, latestCommitTime, fileSlice, 
dataSchema,
         requestedSchema, internalSchemaOpt, hoodieTableMetaClient, props, 
start, length,
-        shouldUseRecordPosition, false, false, false, Option.empty(), false);
+        shouldUseRecordPosition, false, false, false, Collections.emptyList(), 
false);
+  }
+
+  private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, 
HoodieStorage storage, String tablePath,
+                                String latestCommitTime, HoodieBaseFile 
baseFile, Iterator<T> recordIterator,
+                                Schema dataSchema, Schema requestedSchema, 
Option<InternalSchema> internalSchemaOpt,
+                                HoodieTableMetaClient hoodieTableMetaClient, 
TypedProperties props,
+                                long start, long length, boolean 
shouldUseRecordPosition, boolean allowInflightInstants,
+                                boolean emitDelete, boolean sortOutput,
+                                List<BaseFileUpdateCallback<T>> 
updateCallbacks, boolean enableOptimizedLogBlockScan) {
+    this.readerContext = readerContext;

Review Comment:
   +1 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -217,6 +269,28 @@ public void write() {
     }
   }
 
+  private List<BaseFileUpdateCallback<T>> createCallbacks() {
+    List<BaseFileUpdateCallback<T>> callbacks = new ArrayList<>();
+    // Handle CDC workflow.
+    if (cdcLogger.isPresent()) {
+      callbacks.add(new CDCCallback<>(cdcLogger.get(), readerContext));
+    }
+    // Stream secondary index stats.
+    if (isSecondaryIndexStatsStreamingWritesEnabled || 
writeStatus.isTrackingSuccessfulWrites()) {

Review Comment:
   I don't think we need `writeStatus.isTrackingSuccessfulWrites()`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -108,6 +118,42 @@ public FileGroupReaderBasedMergeHandle(HoodieWriteConfig 
config, String instantT
     } else {
       this.cdcLogger = Option.empty();
     }
+    this.recordIterator = null;
+    init(operation, this.partitionPath, fileSlice.getBaseFile());
+  }
+
+  /**
+   * FG reader based generic merge handle, which is not just for compaction.
+   */
+  public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                         Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
+                                         TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt,
+                                         HoodieReaderContext<T> readerContext, 
String maxInstantTime,
+                                         HoodieRecord.HoodieRecordType 
enginRecordType) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
+    this.maxInstantTime = maxInstantTime;
+    this.keyToNewRecords = Collections.emptyMap();
+    this.readerContext = readerContext;
+    this.fileSlice = null;
+    this.recordIterator = recordItr;
+    this.operation = null;
+    if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+      this.cdcLogger = Option.of(new HoodieCDCLogger(
+          instantTime,
+          config,
+          hoodieTable.getMetaClient().getTableConfig(),
+          partitionPath,
+          storage,
+          getWriterSchema(),
+          createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX, 
Option.empty()),
+          IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)));
+    } else {
+      this.cdcLogger = Option.empty();
+    }
+    // If the table is a metadata table or the base file is an HFile, we use 
AVRO record type, otherwise we use the engine record type.
+    this.recordType = (hoodieTable.isMetadataTable()
+        || HFILE.getFileExtension().equals(hoodieTable.getBaseFileExtension()))
+        ? HoodieRecord.HoodieRecordType.AVRO : enginRecordType;

Review Comment:
   I see that `HoodieMergeHandle` contains two constructors (one for COW merge 
and another one for MOR compaction). and we call `getLatestBaseFile()` for COW 
merge flows. 
   We are missing that here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to