danny0405 commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1616449812


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -396,8 +396,8 @@ public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> ol
    */
   public static String getPartitionNameFromPartitionType(MetadataPartitionType 
partitionType, HoodieTableMetaClient metaClient, String indexName) {
     if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType)) {
-      checkArgument(metaClient.getFunctionalIndexMetadata().isPresent(), 
"Index definition is not present");
-      return 
metaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().get(indexName).getIndexName();
+      checkArgument(metaClient.getIndexesMetadata().isPresent(), "Index 
definition is not present");

Review Comment:
   `getIndeciesMetadata` or just `getIndexMetadata`



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexesMetadata.java:
##########
@@ -76,19 +76,19 @@ public String toJson() throws JsonProcessingException {
    * Deserialize from JSON string to create an instance of this class.
    *
    * @param json Input JSON string.
-   * @return Deserialized instance of HoodieFunctionalIndexMetadata.
+   * @return Deserialized instance of HoodieIndexesMetadata.
    * @throws IOException If any deserialization errors occur.
    */
-  public static HoodieFunctionalIndexMetadata fromJson(String json) throws 
IOException {
+  public static HoodieIndexesMetadata fromJson(String json) throws IOException 
{
     if (json == null || json.isEmpty()) {
-      return new HoodieFunctionalIndexMetadata();
+      return new HoodieIndexesMetadata();
     }
-    return JsonUtils.getObjectMapper().readValue(json, 
HoodieFunctionalIndexMetadata.class);
+    return JsonUtils.getObjectMapper().readValue(json, 
HoodieIndexesMetadata.class);
   }
 
   @Override
   public String toString() {
-    return "HoodieFunctionalIndexMetadata{"
+    return "HoodieIndexesMetadata{"

Review Comment:
   Let's avoid the hard code of the class name, we can fetch it through 
`getClass.getSimpleName`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -410,6 +413,14 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
             }
             fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(partitionInfoList);
             break;
+          case SECONDARY_INDEX:
+            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit();
+            if (secondaryIndexPartitionsToInit.isEmpty()) {
+              continue;
+            }
+            ValidationUtils.checkState(secondaryIndexPartitionsToInit.size() 
== 1, "Only one secondary index at a time is supported for now");

Review Comment:
   Then what about the other ones, they would never got a chance to bootstrap 
right



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala:
##########
@@ -77,7 +77,7 @@ class FunctionalIndexSupport(spark: SparkSession,
    * Return true if metadata table is enabled and functional index metadata 
partition is available.
    */
   def isIndexAvailable: Boolean = {
-    metadataConfig.isEnabled && 
metaClient.getFunctionalIndexMetadata.isPresent && 
!metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.isEmpty
+    metadataConfig.isEnabled && metaClient.getIndexesMetadata.isPresent && 
!metaClient.getIndexesMetadata.get().getIndexDefinitions.isEmpty

Review Comment:
   `getIndexMetadata`



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -112,6 +113,14 @@ static boolean isMetadataTable(String basePath) {
     return basePath.endsWith(HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH);
   }
 
+  static boolean isMetadataTableSecondaryIndexPartition(String basePath, 
Option<String> partitionName) {
+    if (!isMetadataTable(basePath) || !partitionName.isPresent()) {

Review Comment:
   Maybe we can move this utility to `HoodieTableMetadataUtil`.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexesMetadata.java:
##########
@@ -32,29 +32,29 @@
 import java.util.Map;
 
 /**
- * Represents the metadata for all functional indexes in Hudi.
+ * Represents the metadata for all functional and secondary indexes in Hudi.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class HoodieFunctionalIndexMetadata implements Serializable {
+public class HoodieIndexesMetadata implements Serializable {

Review Comment:
   `HoodieIndexMetadata` should be fine.



##########
hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java:
##########
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
           + "Actual value will be obtained by invoking .toString() on the 
field value. Nested fields can be specified using\n"
           + "the dot notation eg: `a.b.c`");
 
+  public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = 
ConfigProperty
+      .key("hoodie.datasource.write.secondarykey.field")

Review Comment:
   column or field, because in `HoodieMetadataConfig`, we have 
`*.index.secondary.column`.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromFileSlices(HoodieEngine
     });
   }
 
-  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition 
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     Schema tableSchema = schemaResolver.getTableAvroSchema();
     return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
   }
 
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+                                                                        int 
recordIndexMaxParallelism,
+                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                        
HoodieIndexDefinition indexDefinition) {
+    if (partitionFiles.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
+    final int parallelism = Math.min(partitionFiles.size(), 
recordIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final Pair<String, List<String>> baseAndLogFiles = 
partitionAndBaseFile.getValue();
+      List<String> logFilePaths = new ArrayList<>();
+      baseAndLogFiles.getValue().forEach(logFile -> {
+        logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + 
StoragePath.SEPARATOR + logFile);
+      });
+      String filePath = baseAndLogFiles.getKey();
+
+      Option<StoragePath> dataFilePath = Option.empty();
+      Schema tableSchema;
+
+      if (!filePath.isEmpty()) {
+        dataFilePath = Option.of(filePath(basePath, "", filePath));
+        tableSchema = 
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
 dataFilePath.get());
+      } else {
+        TableSchemaResolver schemaResolver = new 
TableSchemaResolver(metaClient);

Review Comment:
   Can we fetch the table schema out of the `flatMap` loop, i.e. on the Spark 
driver, the table schema resolver is a little colstly and it has to list the 
metadata files each time.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -45,14 +48,14 @@ public class HoodieFunctionalIndexDefinition implements 
Serializable {
   // Any other configuration or properties specific to the index
   private Map<String, String> indexOptions;
 
-  public HoodieFunctionalIndexDefinition() {
+  public HoodieIndexDefinition() {
   }
 
-  public HoodieFunctionalIndexDefinition(String indexName, String indexType, 
String indexFunction, List<String> sourceFields,
-                                         Map<String, String> indexOptions) {
+  public HoodieIndexDefinition(String indexName, String indexType, String 
indexFunction, List<String> sourceFields,

Review Comment:
   So after this change, we only have secondary index and the notion of 
functional index is removed.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -222,4 +227,48 @@ protected HoodieTable getHoodieTable(HoodieWriteConfig 
writeConfig, HoodieTableM
   public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> 
initializeWriteClient() {
     return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
   }
+
+  @Override
+  protected HoodieData<HoodieRecord> 
getSecondaryIndexRecordsFromFileSlices(List<Pair<String, FileSlice>> 
partitionFileSlicePairs,
+                                                                            
HoodieIndexDefinition indexDefinition,
+                                                                            
int parallelism) throws IOException {

Review Comment:
   Another choice is we add a `getEngineContext` abstract method for 
sub-classes, and these methods impl can move to the base class.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1086,71 @@ private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata
     return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf);
   }
 
+  private void updateSecondaryIndexIfPresent(HoodieCommitMetadata 
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+    dataMetaClient.getTableConfig().getMetadataPartitions()
+        .stream()
+        .filter(partition -> 
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .forEach(partition -> {
+          HoodieData<HoodieRecord> secondaryIndexRecords;
+          try {
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, writeStatus);
+          } catch (Exception e) {
+            throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
+          }
+          partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+        });
+  }
+
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+    List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = 
getPartitionFilePairs(commitMetadata);
+    // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
+    // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
+    // operation is going to be expensive (in CPU, memory and IO)
+    List<String> keysToRemove = new ArrayList<>();
+    writeStatus.collectAsList().forEach(status -> {
+      status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+        // Consider those keys which were either updated or deleted in this 
commit
+        if (!recordDelegate.getNewLocation().isPresent() || 
(recordDelegate.getCurrentLocation().isPresent() && 
recordDelegate.getNewLocation().isPresent())) {
+          keysToRemove.add(recordDelegate.getRecordKey());
+        }
+      });
+    });
+    HoodieIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexPartition);

Review Comment:
   is the functional index notion already removed?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromFileSlices(HoodieEngine
     });
   }
 
-  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition 
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     Schema tableSchema = schemaResolver.getTableAvroSchema();
     return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
   }
 
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+                                                                        int 
recordIndexMaxParallelism,
+                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                        
HoodieIndexDefinition indexDefinition) {
+    if (partitionFiles.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
+    final int parallelism = Math.min(partitionFiles.size(), 
recordIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final Pair<String, List<String>> baseAndLogFiles = 
partitionAndBaseFile.getValue();
+      List<String> logFilePaths = new ArrayList<>();
+      baseAndLogFiles.getValue().forEach(logFile -> {
+        logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + 
StoragePath.SEPARATOR + logFile);
+      });
+      String filePath = baseAndLogFiles.getKey();
+
+      Option<StoragePath> dataFilePath = Option.empty();
+      Schema tableSchema;
+
+      if (!filePath.isEmpty()) {
+        dataFilePath = Option.of(filePath(basePath, "", filePath));
+        tableSchema = 
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
 dataFilePath.get());
+      } else {
+        TableSchemaResolver schemaResolver = new 
TableSchemaResolver(metaClient);
+        tableSchema = schemaResolver.getTableAvroSchema();
+      }
+
+      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, tableSchema, partition, dataFilePath, indexDefinition);
+    });
+  }
+
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
+                                                                         
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+                                                                         int 
recordIndexMaxParallelism,
+                                                                         
String activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                         
HoodieIndexDefinition indexDefinition) throws IOException {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFileSlicePairs.size() + " file slices");
+    final int parallelism = Math.min(partitionFileSlicePairs.size(), 
recordIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final FileSlice fileSlice = partitionAndBaseFile.getValue();
+
+      final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+      final String filename = baseFile.getFileName();
+      StoragePath dataFilePath = filePath(basePath, partition, filename);
+      TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+      Schema tableSchema = schemaResolver.getTableAvroSchema();

Review Comment:
   ditto



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromFileSlices(HoodieEngine
     });
   }
 
-  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition 
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     Schema tableSchema = schemaResolver.getTableAvroSchema();
     return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
   }
 
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+                                                                        int 
recordIndexMaxParallelism,
+                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                        
HoodieIndexDefinition indexDefinition) {
+    if (partitionFiles.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
+    final int parallelism = Math.min(partitionFiles.size(), 
recordIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final Pair<String, List<String>> baseAndLogFiles = 
partitionAndBaseFile.getValue();
+      List<String> logFilePaths = new ArrayList<>();
+      baseAndLogFiles.getValue().forEach(logFile -> {
+        logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + 
StoragePath.SEPARATOR + logFile);
+      });
+      String filePath = baseAndLogFiles.getKey();
+
+      Option<StoragePath> dataFilePath = Option.empty();
+      Schema tableSchema;
+
+      if (!filePath.isEmpty()) {
+        dataFilePath = Option.of(filePath(basePath, "", filePath));
+        tableSchema = 
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
 dataFilePath.get());
+      } else {
+        TableSchemaResolver schemaResolver = new 
TableSchemaResolver(metaClient);
+        tableSchema = schemaResolver.getTableAvroSchema();
+      }
+
+      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, tableSchema, partition, dataFilePath, indexDefinition);
+    });
+  }
+
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
+                                                                         
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+                                                                         int 
recordIndexMaxParallelism,
+                                                                         
String activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                         
HoodieIndexDefinition indexDefinition) throws IOException {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFileSlicePairs.size() + " file slices");
+    final int parallelism = Math.min(partitionFileSlicePairs.size(), 
recordIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final FileSlice fileSlice = partitionAndBaseFile.getValue();
+
+      final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+      final String filename = baseFile.getFileName();
+      StoragePath dataFilePath = filePath(basePath, partition, filename);
+      TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+      Schema tableSchema = schemaResolver.getTableAvroSchema();
+
+      List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+          .map(l -> l.getPath().toString()).collect(toList());
+
+      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, tableSchema, partition, Option.of(dataFilePath), indexDefinition);
+    });
+  }
+
+  private static ClosableIterator<HoodieRecord> 
createSecondaryIndexGenerator(HoodieTableMetaClient metaClient,
+                                                                              
EngineType engineType, List<String> logFilePaths,
+                                                                              
Schema tableSchema, String partition,
+                                                                              
Option<StoragePath> dataFilePath,
+                                                                              
HoodieIndexDefinition indexDefinition) throws Exception {
+    final String basePath = metaClient.getBasePathV2().toString();
+    final StorageConfiguration<?> storageConf = metaClient.getStorageConf();
+
+    HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(
+        basePath,
+        engineType,
+        Collections.emptyList(),
+        metaClient.getTableConfig().getRecordMergerStrategy());
+
+    HoodieMergedLogRecordScanner mergedLogRecordScanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withStorage(metaClient.getStorage())
+        .withBasePath(basePath)
+        .withLogFilePaths(logFilePaths)
+        .withReaderSchema(tableSchema)
+        
.withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(""))
+        .withReverseReader(false)
+        
.withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(), 
DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
+        
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue())
+        .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
+        .withPartition(partition)
+        .withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie" + 
HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false))
+        .withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
+        
.withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+        .withRecordMerger(recordMerger)
+        .withTableMetaClient(metaClient)
+        .build();
+
+    Option<HoodieFileReader> baseFileReader = Option.empty();
+    if (dataFilePath.isPresent()) {
+      baseFileReader = 
Option.of(HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(getReaderConfigs(storageConf),
 dataFilePath.get()));
+    }
+    HoodieFileSliceReader fileSliceReader = new 
HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, 
metaClient.getTableConfig().getPreCombineField(), recordMerger,
+        metaClient.getTableConfig().getProps(),
+        Option.empty());
+    ClosableIterator<HoodieRecord> fileSliceIterator = 
ClosableIterator.wrap(fileSliceReader);
+    return new ClosableIterator<HoodieRecord>() {
+      @Override
+      public void close() {
+        fileSliceIterator.close();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return fileSliceIterator.hasNext();
+      }
+
+      @Override
+      public HoodieRecord next() {
+        HoodieRecord record = fileSliceIterator.next();
+        String recordKey = record.getRecordKey(tableSchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+        String secondaryKeyFields = String.join(".", 
indexDefinition.getSourceFields());
+        String secondaryKey;
+        try {
+          GenericRecord genericRecord = (GenericRecord) 
(record.toIndexedRecord(tableSchema, new Properties()).get()).getData();

Review Comment:
   You can use `CollectionUtils.EMPTY_PROPERTIES` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to