vinothchandar commented on code in PR #10625:
URL: https://github.com/apache/hudi/pull/10625#discussion_r1504979741


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -223,6 +228,11 @@ private void enablePartitions() {
     }
     if (dataWriteConfig.isRecordIndexEnabled() || 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX)) {
       this.enabledPartitionTypes.add(RECORD_INDEX);
+
+      // Enable secondary index only iff record index is enabled
+      if (dataWriteConfig.isSecondaryIndexEnabled() || 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) {
+        this.enabledPartitionTypes.add(SECONDARY_INDEX);

Review Comment:
   nts: rename `SECONDARY_INDEX` ? technically the record index itself is an 
unique secondary index. This is just a non-unique secondary index.  



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition 
getFunctionalIndexDefinition(String inde
     }
   }
 
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeSecondaryIndexPartition() throws IOException {
+    HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient, 
dataMetaClient.getActiveTimeline(), metadata);
+    // Collect the list of latest file slices present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    fsView.loadAllPartitions();
+
+    List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
+    partitions.forEach(partition -> {

Review Comment:
   would n't this take a long time, if done on the driver?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -131,6 +131,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Columns used to uniquely identify the table. 
Concatenated values of these fields are used as "
           + " the record key component of HoodieKey.");
 
+  public static final ConfigProperty<String> SECONDARYKEY_FIELDS = 
ConfigProperty

Review Comment:
   I'd like for this to be absorbed into how we are tracking functional index 
flow and not introduce any specific table configs here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -111,6 +123,16 @@ private HoodieMergedLogRecordScanner(FileSystem fs, String 
basePath, List<String
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
           new HoodieRecordSizeEstimator(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
+      this.nonUniqueKeyRecords = new 
ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new 
DefaultSizeEstimator(),
+          new HoodieRecordSizeEstimator(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
+
+      if (logFilePaths.size() > 0 && 
HoodieTableMetadata.isMetadataTableSecondaryIndexPartition(basePath, 
partitionName)) {

Review Comment:
   this layer cannot be aware of metadata table. 



##########
hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java:
##########
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
           + "Actual value will be obtained by invoking .toString() on the 
field value. Nested fields can be specified using\n"
           + "the dot notation eg: `a.b.c`");
 
+  public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = 
ConfigProperty

Review Comment:
   +1



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() {
     return 
getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
   }
 
+  public boolean isSecondaryIndexEnabled() {

Review Comment:
   nts: revisit if this is needed or we need to consolidate using the record 
level index flag



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition 
getFunctionalIndexDefinition(String inde
     }
   }
 
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeSecondaryIndexPartition() throws IOException {
+    HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient, 
dataMetaClient.getActiveTimeline(), metadata);
+    // Collect the list of latest file slices present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    fsView.loadAllPartitions();
+
+    List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
+    partitions.forEach(partition -> {
+      fsView.getLatestFileSlices(partition).forEach(fs -> {
+        partitionFileSlicePairs.add(Pair.of(partition, fs));
+      });
+    });
+
+    // Reuse record index parallelism config to build secondary index
+    int parallelism = Math.min(partitionFileSlicePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+
+    HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+        engineContext,
+        partitionFileSlicePairs,
+        parallelism,
+        this.getClass().getSimpleName(),
+        dataMetaClient,
+        EngineType.SPARK);
+
+    // Initialize the file groups - using the same estimation logic as that of 
record index

Review Comment:
   scope for code reuse?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -63,15 +61,21 @@ public HoodieFileSliceReader(Option<HoodieFileReader> 
baseFileReader,
     }
     this.props = props;
     this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
-    this.records = scanner.getRecords();
   }
 
   private boolean hasNextInternal() {
     while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) {
       try {
         HoodieRecord currentRecord = 
baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema, 
props,
             simpleKeyGenFieldsOpt, scanner.isWithOperationField(), 
scanner.getPartitionNameOverride(), false, Option.empty());
-        Option<HoodieRecord> logRecord = 
removeLogRecord(currentRecord.getRecordKey());
+
+        if (!scanner.hasKey(currentRecord.getRecordKey())) {

Review Comment:
   what is this change achieving?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -216,4 +217,19 @@ protected HoodieData<HoodieRecord> 
getFunctionalIndexRecords(List<Pair<String, F
   public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> 
initializeWriteClient() {
     return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
   }
+
+  @Override
+  public HoodieData<HoodieRecord> 
getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext,

Review Comment:
   nts: revisit if this is a necessary extension



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition 
getFunctionalIndexDefinition(String inde
     }
   }
 
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeSecondaryIndexPartition() throws IOException {
+    HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient, 
dataMetaClient.getActiveTimeline(), metadata);
+    // Collect the list of latest file slices present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    fsView.loadAllPartitions();
+
+    List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
+    partitions.forEach(partition -> {
+      fsView.getLatestFileSlices(partition).forEach(fs -> {
+        partitionFileSlicePairs.add(Pair.of(partition, fs));
+      });
+    });
+
+    // Reuse record index parallelism config to build secondary index
+    int parallelism = Math.min(partitionFileSlicePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+
+    HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+        engineContext,
+        partitionFileSlicePairs,
+        parallelism,
+        this.getClass().getSimpleName(),
+        dataMetaClient,
+        EngineType.SPARK);
+
+    // Initialize the file groups - using the same estimation logic as that of 
record index
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(),

Review Comment:
   this is going to compute `records` twice? no?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java:
##########
@@ -24,25 +24,29 @@
 import org.apache.hudi.common.util.collection.CachingIterator;
 
 import java.util.Iterator;
-import java.util.Map;
 
 public class LogFileIterator<T> extends CachingIterator<HoodieRecord<T>> {
   HoodieMergedLogRecordScanner scanner;
-  Map<String, HoodieRecord> records;
+  // Map<String, HoodieRecord> records;
   Iterator<HoodieRecord> iterator;
 
-  protected Option<HoodieRecord> removeLogRecord(String key) {
-    return Option.ofNullable(records.remove(key));
+  // protected Option<HoodieRecord> removeLogRecord(String key) {
+  //  return Option.ofNullable(records.remove(key));
+  // }
+
+  protected Option<HoodieRecord> removeLogRecord(HoodieRecord record) {
+    return scanner.remove(record);
   }
 
   public LogFileIterator(HoodieMergedLogRecordScanner scanner) {
     this.scanner = scanner;
-    this.records = scanner.getRecords();
+    // this.records = scanner.getRecords();
   }
 
   private boolean hasNextInternal() {
     if (iterator == null) {
-      iterator = records.values().iterator();
+      // iterator = records.values().iterator();

Review Comment:
   remove?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -82,6 +88,9 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
   public final HoodieTimer timer = HoodieTimer.create();
   // Map of compacted/merged records
   private final ExternalSpillableMap<String, HoodieRecord> records;
+
+  private final ExternalSpillableMap<String, HashMap<String, HoodieRecord>> 
nonUniqueKeyRecords;

Review Comment:
   serializing the hashmap to spillable disk - may not be efficient? or is that 
okay? I wonder if we need another holder object.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +350,68 @@ public <T> void processNextRecord(HoodieRecord<T> 
newRecord) throws IOException
     }
   }
 
+  private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord) 
throws IOException {

Review Comment:
   I think we should probably subclass /find some way to avoid having this code 
right inside this class?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T> 
newRecord) throws IOException
     }
   }
 
+  private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord) 
throws IOException {
+    String key = newRecord.getRecordKey();
+    HoodieMetadataPayload newPayload = (HoodieMetadataPayload) 
newRecord.getData();
+
+    // The rules for merging the prevRecord and the latestRecord is noted 
below. Note that this only applies for SecondaryIndex

Review Comment:
   hmmm. we should be using the combineAndGet... method of the MetadataPayload?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, hadoopConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatus) throws Exception {
+    // Build a list of basefiles+delta-log-files for every partition that this 
commit touches
+    // {
+    //   {
+    //     "partition1", { {"baseFile11", {"logFile11", "logFile12"}}, 
{"baseFile12", {"logFile11"} } },
+    //   },
+    //   {
+    //     "partition2", { {"baseFile21", {"logFile21", "logFile22"}}, 
{"baseFile22", {"logFile21"} } }
+    //   }
+    // }
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new 
ArrayList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((dataPartition, 
writeStats) -> {
+      writeStats.forEach(writeStat -> {
+        if (writeStat instanceof HoodieDeltaWriteStat) {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(), 
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+        } else {
+          partitionFilePairs.add(Pair.of(dataPartition, 
Pair.of(writeStat.getPath(), new ArrayList<>())));
+        }
+      });
+    });
+
+    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
+    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
+    // operation is going to be expensive (in CPU, memory and IO)
+    List<String> keysToRemove = new ArrayList<>();
+    writeStatus.collectAsList().forEach(status -> {

Review Comment:
   this collect can OOM right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to