nsivabalan commented on code in PR #13445:
URL: https://github.com/apache/hudi/pull/13445#discussion_r2151380390


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -89,6 +92,26 @@ public HoodieInternalRow(UTF8String[] metaFields,
     this.metaFields = metaFields;
     this.sourceRow = sourceRow;
     this.sourceContainsMetaFields = sourceContainsMetaFields;
+    this.isDeleteOperation = false;
+  }
+
+  private HoodieInternalRow(UTF8String recordKey,

Review Comment:
   if this is anyway private, can you take in isDeleteOperation as an argument. 
and so we can set it to tru in L 114. Just that looking at constructor 
arguments, its not very apparent that this is for delete operation. 



##########
hudi-common/src/main/java/org/apache/hudi/avro/DeleteIndexedRecord.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+public class DeleteIndexedRecord implements IndexedRecord {
+  private final String[] metaFields;
+  private final IndexedRecord record;
+
+  public DeleteIndexedRecord(String recordKey, String partitionPath, 
IndexedRecord record) {

Review Comment:
   IndexedDeleteRecord



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -58,6 +58,8 @@ public class HoodieInternalRow extends InternalRow {
    */
   private final UTF8String[] metaFields;
   private final InternalRow sourceRow;
+  // indicates whether this row represents a delete operation. Used in the CDC 
read.
+  private final boolean isDeleteOperation;

Review Comment:
   why not just `isDeleted` 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2376,35 +2383,29 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromFileSlices(HoodieEngine
     final int parallelism = Math.min(partitionFileSlicePairs.size(), 
recordIndexMaxParallelism);
     final StoragePath basePath = metaClient.getBasePath();
     final StorageConfiguration<?> storageConf = metaClient.getStorageConf();
+    final Schema tableSchema;
+    try {
+      tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+    } catch (Exception e) {
+      throw new HoodieException("Unable to resolve table schema for table", e);
+    }
+    ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(metaClient);
     return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
       final String partition = partitionAndBaseFile.getKey();
       final FileSlice fileSlice = partitionAndBaseFile.getValue();
       if (!fileSlice.getBaseFile().isPresent()) {
-        List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-            .map(l -> l.getPath().toString()).collect(Collectors.toList());
-        HoodieMergedLogRecordScanner mergedLogRecordScanner = 
HoodieMergedLogRecordScanner.newBuilder()
-            .withStorage(metaClient.getStorage())
-            .withBasePath(basePath)
-            .withLogFilePaths(logFilePaths)
-            .withReaderSchema(HoodieAvroUtils.getRecordKeySchema())
-            
.withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""))
-            .withReverseReader(false)
-            .withMaxMemorySizeInBytes(storageConf.getLong(
-                MAX_MEMORY_FOR_COMPACTION.key(), 
DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
-            
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
-            .withPartition(fileSlice.getPartitionPath())
-            
.withOptimizedLogBlocksScan(storageConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
 false))
-            
.withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
-            .withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(
-                DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), 
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-            .withRecordMerger(HoodieRecordUtils.createRecordMerger(
-                metaClient.getBasePath().toString(),
-                engineType,
-                Collections.emptyList(), // TODO: support different merger 
classes, which is currently only known to write config
-                metaClient.getTableConfig().getRecordMergeStrategyId()))
-            .withTableMetaClient(metaClient)
+        HoodieFileGroupReader fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+            .withReaderContext(readerContextFactory.getContext())
+            .withHoodieTableMetaClient(metaClient)
+            .withFileSlice(fileSlice)
+            .withDataSchema(tableSchema)
+            .withRequestedSchema(HoodieAvroUtils.getRecordKeySchema())
+            
.withLatestCommitTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""))

Review Comment:
   can we compute the latestInstant in driver and let the executor access it 
directly. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1009,40 +1017,38 @@ private static boolean 
isDeleteRecord(HoodieTableMetaClient dataTableMetaClient,
     }
   }
 
-  private static Map<String, HoodieRecord> getLogRecords(List<String> 
logFilePaths,
+  private static <T> ClosableIterator<HoodieRecord<T>> 
getLogRecords(List<String> logFilePaths,
                                                          HoodieTableMetaClient 
datasetMetaClient,
                                                          Option<Schema> 
writerSchemaOpt,
                                                          String 
latestCommitTimestamp,
-                                                         EngineType 
engineType) {
-    if (writerSchemaOpt.isPresent()) {
+                                                         String partitionPath,
+                                                         
HoodieReaderContext<T> readerContext) {
+    if (writerSchemaOpt.isPresent() && !logFilePaths.isEmpty()) {
+      List<HoodieLogFile> logFiles = 
logFilePaths.stream().map(HoodieLogFile::new).collect(Collectors.toList());
+      FileSlice fileSlice = new FileSlice(partitionPath, 
logFiles.get(0).getFileId(), logFiles.get(0).getDeltaCommitTime());

Review Comment:
   delta commit time of first log file may not be the file slice's base instant 
time right. but I see here we are setting it so. 
   Can you confirm this does not have any other side effects. 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java:
##########
@@ -107,7 +107,7 @@ public int hashCode() {
 
   @Override
   public String toString() {
-    return "DeleteRecord {"
+    return "DeleteIndexedRecord {"

Review Comment:
   should this be reverted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to