yihua commented on code in PR #14031:
URL: https://github.com/apache/hudi/pull/14031#discussion_r2402094226


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java:
##########
@@ -114,20 +124,60 @@ public static <T> HoodieData<HoodieRecord> 
convertWriteStatsToSecondaryIndexReco
       String fileId = writeStatsByFileIdEntry.getKey();
       List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
       String partition = writeStats.get(0).getPartitionPath();
-      FileSlice previousFileSliceForFileId = 
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+      StoragePath basePath = dataMetaClient.getBasePath();
+
+      // validate that for a given fileId, either we have 1 parquet file or N 
log files.
+      AtomicInteger totalParquetFiles = new AtomicInteger();
+      AtomicInteger totalLogFiles = new AtomicInteger();
+      writeStats.stream().forEach(writeStat -> {
+        if (FSUtils.isLogFile(new StoragePath(basePath, writeStat.getPath()))) 
{
+          totalLogFiles.getAndIncrement();
+        } else {
+          totalParquetFiles.getAndIncrement();
+        }
+      });
+
+      ValidationUtils.checkArgument(!(totalParquetFiles.get() > 0 && 
totalLogFiles.get() > 0), "Only either of base file or log files are expected 
for a given file group. "
+          + "Partition " + partition + ", fileId " + fileId);
+      if (totalParquetFiles.get() > 0) {
+        // we should expect only 1 parquet file
+        ValidationUtils.checkArgument(writeStats.size() == 1, "Only one new 
parquet file expected per file group per commit");
+      }
+      // Instantiate Remote table FSV
+      TableFileSystemView.SliceView sliceView = getSliceView(writeConfig,  
dataMetaClient);
+      Option<FileSlice> fileSliceOption = 
sliceView.getLatestMergedFileSliceBeforeOrOn(partition, instantTime, fileId);
       Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice;
-      if (previousFileSliceForFileId == null) {
-        // new file slice, so empty mapping for previous slice
-        recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
-      } else {
+      Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice;
+      if (fileSliceOption.isPresent()) { // if previous file slice is present.
         recordKeyToSecondaryKeyForPreviousFileSlice =
-            getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), previousFileSliceForFileId, tableSchema, 
indexDefinition, instantTime, props, false);
+            getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), fileSliceOption.get(), tableSchema, 
indexDefinition, instantTime, props, false);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to