nsivabalan commented on code in PR #12269:
URL: https://github.com/apache/hudi/pull/12269#discussion_r1849264418


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,77 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeleteBlocks = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() == 0 
&& writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0;
+      });
+      Option<String> latestCommitTimestamp = Option.empty();
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeleteBlocks) { // if we have a log file w/ pure 
deletes.
+        latestCommitTimestamp = 
Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime());
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      Option<String> finalLatestCommitTimestamp = latestCommitTimestamp;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, 
writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process delete blocks for RLI
+              if (writeStat.getNumInserts() == 0 && 
writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) {
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + 
writeStat.getPath(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, 
finalLatestCommitTimestamp.get());
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return recordIndexRecords.mapToPair(
+              (SerializablePairFunction<HoodieRecord, HoodieKey, 
HoodieRecord>) t -> Pair.of(t.getKey(), t))
+          .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+            boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+            boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+            if (isRecord1Deleted && !isRecord2Deleted) {
+              return record2;
+            } else if (!isRecord1Deleted && isRecord2Deleted) {
+              return record1;
+            } else {
+              throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "

Review Comment:
   nope. this means there is some bug in RLI itself or some code path. 
   essentially in global index use-cases, we can't have dups. For eg, even if 
we disable RLI and fallback to using GLOBAL_SIMPLE index, we should ensure 
there are no dups. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to