danny0405 commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1619761496


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -45,14 +49,14 @@ public class HoodieFunctionalIndexDefinition implements 
Serializable {
   // Any other configuration or properties specific to the index
   private Map<String, String> indexOptions;
 
-  public HoodieFunctionalIndexDefinition() {
+  public HoodieIndexDefinition() {
   }
 
-  public HoodieFunctionalIndexDefinition(String indexName, String indexType, 
String indexFunction, List<String> sourceFields,
-                                         Map<String, String> indexOptions) {
+  public HoodieIndexDefinition(String indexName, String indexType, String 
indexFunction, List<String> sourceFields,
+                               Map<String, String> indexOptions) {
     this.indexName = indexName;
     this.indexType = indexType;
-    this.indexFunction = indexFunction;
+    this.indexFunction = nonEmpty(indexFunction) ? indexFunction : 
SPARK_IDENTITY;

Review Comment:
   Not sure why the empty string binds to spark.



##########
hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java:
##########
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
           + "Actual value will be obtained by invoking .toString() on the 
field value. Nested fields can be specified using\n"
           + "the dot notation eg: `a.b.c`");
 
+  public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME = 
ConfigProperty

Review Comment:
   SECONDARYKEY_COLUMN_NAME



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1837,153 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromFileSlices(HoodieEngine
     });
   }
 
-  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition 
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+  public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     Schema tableSchema = schemaResolver.getTableAvroSchema();
     return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
   }
 
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+                                                                        
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+                                                                        int 
secondaryIndexMaxParallelism,
+                                                                        String 
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                        
HoodieIndexDefinition indexDefinition) {
+    if (partitionFiles.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    final int parallelism = Math.min(partitionFiles.size(), 
secondaryIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    Schema tableSchema;
+    try {
+      tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest schema for " + 
metaClient.getBasePathV2(), e);
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFiles.size() + " partitions");
+    return engineContext.parallelize(partitionFiles, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final Pair<String, List<String>> baseAndLogFiles = 
partitionAndBaseFile.getValue();
+      List<String> logFilePaths = new ArrayList<>();
+      baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath 
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
+      String filePath = baseAndLogFiles.getKey();
+      Option<StoragePath> dataFilePath = filePath.isEmpty() ? Option.empty() : 
Option.of(filePath(basePath, partition, filePath));
+      Schema readerSchema;
+      if (dataFilePath.isPresent()) {
+        readerSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
+            
.getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+            .readAvroSchema(metaClient.getStorage(), dataFilePath.get());
+      } else {
+        readerSchema = tableSchema;
+      }
+      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition);
+    });
+  }
+
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
+                                                                         
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+                                                                         int 
secondaryIndexMaxParallelism,
+                                                                         
String activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                         
HoodieIndexDefinition indexDefinition) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    final int parallelism = Math.min(partitionFileSlicePairs.size(), 
secondaryIndexMaxParallelism);
+    final String basePath = metaClient.getBasePathV2().toString();
+    Schema tableSchema;
+    try {
+      tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest schema for " + 
metaClient.getBasePathV2(), e);
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFileSlicePairs.size() + " file slices");
+    return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final FileSlice fileSlice = partitionAndBaseFile.getValue();
+      List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l -> 
l.getPath().toString()).collect(toList());
+      Option<StoragePath> dataFilePath = 
Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> filePath(basePath, 
partition, baseFile.getFileName())).orElseGet(null));
+      Schema readerSchema;
+      if (dataFilePath.isPresent()) {
+        readerSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
+            
.getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+            .readAvroSchema(metaClient.getStorage(), dataFilePath.get());
+      } else {
+        readerSchema = tableSchema;
+      }
+      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition);
+    });
+  }
+
+  private static ClosableIterator<HoodieRecord> 
createSecondaryIndexGenerator(HoodieTableMetaClient metaClient,
+                                                                              
EngineType engineType, List<String> logFilePaths,
+                                                                              
Schema tableSchema, String partition,
+                                                                              
Option<StoragePath> dataFilePath,
+                                                                              
HoodieIndexDefinition indexDefinition) throws Exception {
+    final String basePath = metaClient.getBasePathV2().toString();
+    final StorageConfiguration<?> storageConf = metaClient.getStorageConf();
+
+    HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(
+        basePath,
+        engineType,
+        Collections.emptyList(),
+        metaClient.getTableConfig().getRecordMergerStrategy());
+
+    HoodieMergedLogRecordScanner mergedLogRecordScanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withStorage(metaClient.getStorage())
+        .withBasePath(basePath)
+        .withLogFilePaths(logFilePaths)
+        .withReaderSchema(tableSchema)
+        
.withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(""))
+        .withReverseReader(false)
+        
.withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(), 
DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
+        
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue())
+        .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
+        .withPartition(partition)
+        .withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie" + 
HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false))

Review Comment:
   `"hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN` the code looks 
weird.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -351,6 +351,25 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Parallelism to use, when generating partition stats 
index.");
 
+  public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.secondary.enable")

Review Comment:
   enable or enabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to