bhat-vinay commented on code in PR #10625:
URL: https://github.com/apache/hudi/pull/10625#discussion_r1505530807


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -82,6 +88,9 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
   public final HoodieTimer timer = HoodieTimer.create();
   // Map of compacted/merged records
   private final ExternalSpillableMap<String, HoodieRecord> records;
+
+  private final ExternalSpillableMap<String, HashMap<String, HoodieRecord>> 
nonUniqueKeyRecords;

Review Comment:
   `ExternalSpillableMap` needs to support a disk based map (or need a disk 
backed multi-map that you pointed out). have not really looked into 
`ExternalSpillableMap` implementation yet.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -63,15 +61,21 @@ public HoodieFileSliceReader(Option<HoodieFileReader> 
baseFileReader,
     }
     this.props = props;
     this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
-    this.records = scanner.getRecords();
   }
 
   private boolean hasNextInternal() {
     while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) {
       try {
         HoodieRecord currentRecord = 
baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema, 
props,
             simpleKeyGenFieldsOpt, scanner.isWithOperationField(), 
scanner.getPartitionNameOverride(), false, Option.empty());
-        Option<HoodieRecord> logRecord = 
removeLogRecord(currentRecord.getRecordKey());
+
+        if (!scanner.hasKey(currentRecord.getRecordKey())) {

Review Comment:
   Will remove. Leftover code from a previous iteration (which is subsumed by 
the changes below)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, hadoopConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatus) throws Exception {
+    // Build a list of basefiles+delta-log-files for every partition that this 
commit touches
+    // {
+    //   {
+    //     "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, 
{"baseFile12", {"logFile11"} } },
+    //   },
+    //   {
+    //     "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, 
{"baseFile22", {"logFile21"} } }
+    //   }
+    // }
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new 
ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> {
+      writeStats.forEach(writeStat -> {
+        if (writeStat instanceof HoodieDeltaWriteStat) {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), 
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+        } else {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(writeStat.getPath(), new ArrayList<>())));
+        }
+      });
+    });
+
+    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
+    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
+    // operation is going to be expensive (in CPU, memory and IO)
+    List<String> keysToRemove = new ArrayList<>();
+    writeStatus.collectAsList().forEach(status -> {

Review Comment:
   Yes, the logic here is probably going to change if one uses the 
`WriteStatus` to hold the (old-secondary-key, new-secondary-key) pair. Hence 
did not think of optimising here yet.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -111,6 +123,16 @@ private HoodieMergedLogRecordScanner(FileSystem fs, String 
basePath, List<String
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
           new HoodieRecordSizeEstimator(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
+      this.nonUniqueKeyRecords = new 
ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new 
DefaultSizeEstimator(),
+          new HoodieRecordSizeEstimator(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
+
+      if (logFilePaths.size() > 0 && 
HoodieTableMetadata.isMetadataTableSecondaryIndexPartition(basePath, 
partitionName)) {

Review Comment:
   Can this all be hidden inside a method (still in this layer) - there needs 
to be some way to determine if the logs can have non-unique-keys. Initial 
implementation was had it one layer above (i.e the callers instantiating 
`HoodieMergedLogRecordScanner` passing in the flag, but having it here looked 
cleaner.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T> 
newRecord) throws IOException
     }
   }
 
+  private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord) 
throws IOException {
+    String key = newRecord.getRecordKey();
+    HoodieMetadataPayload newPayload = (HoodieMetadataPayload) 
newRecord.getData();
+
+    // The rules for merging the prevRecord and the latestRecord is noted 
below. Note that this only applies for SecondaryIndex

Review Comment:
   combineAndGet() (which internally calls preCombine()), is an either-or 
operation i.e the caller should ensure that the `prevRecord` and `newRecord` 
are similar. For secondary-index, this similarity depends on the payload of the 
record (HoodieMetadataPayload) - maybe I can add a new API in 
`HoodieRecordPayload` which is implemented by `HoodieMetadataPayload` and avoid 
exposing `HoodieMetadataPayload` to this layer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to