nsivabalan commented on code in PR #12653:
URL: https://github.com/apache/hudi/pull/12653#discussion_r1924058446


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,66 @@ public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> ol
         throw new HoodieIndexException("Unsupported record type: " + 
recordType);
     }
   }
+
+  /**
+   * Register a metadata index.
+   * Index definitions are stored in user-specified path or, by default, in 
.hoodie/.index_defs/index.json.
+   * For the first time, the index definition file will be created if not 
exists.
+   * For the second time, the index definition file will be updated if exists.
+   * Table Config is updated if necessary.
+   */
+  public static void register(HoodieTableMetaClient metaClient, 
HoodieIndexDefinition indexDefinition) {
+    LOG.info("Registering index {} of using {}", 
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+    // build HoodieIndexMetadata and then add to index definition file
+    boolean indexDefnUpdated = 
metaClient.buildIndexDefinition(indexDefinition);
+    if (indexDefnUpdated) {
+      String indexMetaPath = metaClient.getIndexDefinitionPath();
+      // update table config if necessary
+      if 
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+          || 
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
+        
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
 FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(indexMetaPath)));
+        HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
+      }
+    }
+  }
+
+  public static HoodieIndexDefinition 
getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient, 
String userIndexName, String indexType, Map<String, Map<String, String>> 
columns,
+                                                                              
Map<String, String> options, Map<String, String> tableProperties) throws 
Exception {
+    String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
+        ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
+        : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+    if (indexExists(metaClient, fullIndexName)) {
+      throw new HoodieMetadataIndexException("Index already exists: " + 
userIndexName);
+    }
+    checkArgument(columns.size() == 1, "Only one column can be indexed for 
functional or secondary index.");
+
+    if (!isEligibleForIndexing(metaClient, indexType, tableProperties, 
columns)) {
+      throw new HoodieMetadataIndexException("Not eligible for indexing: " + 
indexType + ", indexName: " + userIndexName);
+    }
+
+    return HoodieIndexDefinition.newBuilder()
+        .withIndexName(fullIndexName)
+        .withIndexType(indexType)
+        .withIndexFunction(options.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_TRANSFORM))
+        .withSourceFields(new ArrayList<>(columns.keySet()))
+        .withIndexOptions(options)
+        .build();
+  }
+
+  public static boolean indexExists(HoodieTableMetaClient metaClient, String 
indexName) {
+    return 
metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition 
-> partition.equals(indexName));
+  }
+
+  private static boolean isEligibleForIndexing(HoodieTableMetaClient 
metaClient, String indexType, Map<String, String> options, Map<String, 
Map<String, String>> columns) throws Exception {

Review Comment:
   is this applicable only for sec index or other indexes too? 
   if yes, condition in 527 is not very apparent. I would expect something like 
   ```
   if (curIndex is sec Index && !validateDataTypeForSecondaryIndex(new 
ArrayList<>(columns.keySet()), new 
TableSchemaResolver(metaClient).getTableAvroSchema())
   {
   
   }
   ```
   
   of I would rename the method naming to "isEligibleForSecIndexing" 



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -352,6 +358,33 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Parallelism to use, when generating expression 
index.");
 
+  public static final ConfigProperty<String> EXPRESSION_INDEX_COLUMN = 
ConfigProperty
+      .key(METADATA_PREFIX + ".index.expression.column")

Review Comment:
   are these mainly introduced to support index creation via spark datasource 
writer? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2766,6 +2768,79 @@ private static void validatePayload(int type, 
Map<String, HoodieMetadataFileInfo
     }
   }
 
+  public static Set<String> 
getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType, 
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+    Set<String> expressionIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType, dataMetaClient);
+    if (expressionIndexPartitionsToInit.isEmpty()) {
+      if (isNewExpressionIndexDefinitionRequired(metadataConfig, 
dataMetaClient)) {
+        String indexedColumn = metadataConfig.getExpressionIndexColumn();
+        String indexName = metadataConfig.getExpressionIndexName();
+        String indexType = metadataConfig.getExpressionIndexType();
+        // Use a default index name if the indexed column is specified but 
index name is not
+        if (StringUtils.isNullOrEmpty(indexName) && 
StringUtils.nonEmpty(indexedColumn)) {
+          indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexedColumn;
+        }
+        // if user defined index name does not contain the expression_index_ 
prefix, then add it
+        if (StringUtils.nonEmpty(indexName) && 
!indexName.startsWith(PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) {
+          indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexName;
+        }
+        // Build and register the new index definition
+        HoodieIndexDefinition indexDefinition = 
HoodieIndexDefinition.newBuilder()
+            .withIndexName(indexName)
+            .withIndexType(indexType)
+            .withSourceFields(Collections.singletonList(indexedColumn))
+            .withIndexOptions(metadataConfig.getExpressionIndexOptions())
+            .build();
+        dataMetaClient.buildIndexDefinition(indexDefinition);
+        // Re-fetch the partitions after adding the new definition
+        expressionIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType, dataMetaClient);
+      } else {
+        return null;
+      }
+    }
+    return expressionIndexPartitionsToInit;
+  }
+
+  public static Set<String> 
getSecondaryIndexPartitionsToInit(MetadataPartitionType partitionType, 
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+    Set<String> secondaryIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType, dataMetaClient);

Review Comment:
   looks like there are some code duplication. can we templatize and use it for 
both expr index and sec index



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2766,6 +2768,79 @@ private static void validatePayload(int type, 
Map<String, HoodieMetadataFileInfo
     }
   }
 
+  public static Set<String> 
getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType, 
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+    Set<String> expressionIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType, dataMetaClient);
+    if (expressionIndexPartitionsToInit.isEmpty()) {
+      if (isNewExpressionIndexDefinitionRequired(metadataConfig, 
dataMetaClient)) {
+        String indexedColumn = metadataConfig.getExpressionIndexColumn();
+        String indexName = metadataConfig.getExpressionIndexName();
+        String indexType = metadataConfig.getExpressionIndexType();
+        // Use a default index name if the indexed column is specified but 
index name is not
+        if (StringUtils.isNullOrEmpty(indexName) && 
StringUtils.nonEmpty(indexedColumn)) {
+          indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexedColumn;
+        }
+        // if user defined index name does not contain the expression_index_ 
prefix, then add it
+        if (StringUtils.nonEmpty(indexName) && 
!indexName.startsWith(PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) {
+          indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexName;
+        }
+        // Build and register the new index definition
+        HoodieIndexDefinition indexDefinition = 
HoodieIndexDefinition.newBuilder()
+            .withIndexName(indexName)
+            .withIndexType(indexType)
+            .withSourceFields(Collections.singletonList(indexedColumn))
+            .withIndexOptions(metadataConfig.getExpressionIndexOptions())
+            .build();
+        dataMetaClient.buildIndexDefinition(indexDefinition);
+        // Re-fetch the partitions after adding the new definition
+        expressionIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType, dataMetaClient);
+      } else {
+        return null;

Review Comment:
   can we return empty list instead



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,61 @@ public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> ol
         throw new HoodieIndexException("Unsupported record type: " + 
recordType);
     }
   }
+
+  /**
+   * Register a metadata index.
+   * Index definitions are stored in user-specified path or, by default, in 
.hoodie/.index_defs/index.json.
+   * For the first time, the index definition file will be created if not 
exists.
+   * For the second time, the index definition file will be updated if exists.
+   * Table Config is updated if necessary.
+   */
+  public static void register(HoodieTableMetaClient metaClient, 
HoodieIndexDefinition indexDefinition) {
+    LOG.info("Registering index {} of using {}", 
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+    // build HoodieIndexMetadata and then add to index definition file
+    boolean indexDefnUpdated = 
metaClient.buildIndexDefinition(indexDefinition);
+    if (indexDefnUpdated) {
+      String indexMetaPath = metaClient.getIndexDefinitionPath();
+      // update table config if necessary
+      if 
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+          || 
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {

Review Comment:
   can you add "Notes to Reviewer" for such code snippets where we just moved 
and not a new code change. might save some time in reviews.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2766,6 +2768,79 @@ private static void validatePayload(int type, 
Map<String, HoodieMetadataFileInfo
     }
   }
 
+  public static Set<String> 
getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType, 
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+    Set<String> expressionIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType, dataMetaClient);
+    if (expressionIndexPartitionsToInit.isEmpty()) {
+      if (isNewExpressionIndexDefinitionRequired(metadataConfig, 
dataMetaClient)) {

Review Comment:
   this is little unintuitive. 
   I would expect L 2772 to return the new index to initialize. 
   but if it turns out to be empty, here, we end up checking for new index defn 
again. so, what does getIndexPartitionsToInit(partitionType, dataMetaClient) 
lacks? 
   
   may be we can rename the methods to avoid dis-ambiguity 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1524,4 +1524,79 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
     // deleted record should still show in time travel view
     assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
   }
+
+  /**
+   * Test Secondary Index creation through datasource and metadata write 
configs.
+   *
+   * 1. Insert a batch of data with record_index enabled.
+   * 2. Upsert another batch of data with secondary index configs.
+   * 3. Validate that secondary index is created.
+   */
+  @Test
+  def testSecondaryIndexCreation(): Unit = {
+    var (writeOpts, readOpts) = getWriterReaderOpts()
+    writeOpts = writeOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+      HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+      HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+    )
+    readOpts = readOpts ++ Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+    )
+    initMetaClient(HoodieTableType.MERGE_ON_READ)
+    // Create a MOR table and add 10 records to the table.
+    val records = recordsToStrings(dataGen.generateInserts("000", 
3)).asScala.toSeq
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("org.apache.hudi")
+      .options(writeOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshotDF = 
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+    assertEquals(3, snapshotDF.count())
+
+    // Upsert another batch with secondary index configs
+    val secondaryIndexName = "idx_rider"
+    val secondaryIndexColumn = "rider"
+    writeOpts = writeOpts ++ Map(
+      HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true",
+      HoodieMetadataConfig.SECONDARY_INDEX_NAME.key -> secondaryIndexName,
+      HoodieMetadataConfig.SECONDARY_INDEX_COLUMN.key -> secondaryIndexColumn
+    )
+    val records2 = recordsToStrings(dataGen.generateInserts("001", 
3)).asScala.toSeq
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 
10))
+    inputDF2.write.format("org.apache.hudi")
+      .options(writeOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // validate that secondary index is created
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertTrue(metadataPartitionExists(basePath, context, 
PARTITION_NAME_SECONDARY_INDEX_PREFIX + secondaryIndexName))
+    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_NAME_SECONDARY_INDEX_PREFIX
 + secondaryIndexName))

Review Comment:
   I am just trying to improve the usability for users. Index creation is just 
one off thing which a central team/admin can do. And then individual teams can 
own the regular writes (from an organization standpoint) 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,66 @@ public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> ol
         throw new HoodieIndexException("Unsupported record type: " + 
recordType);
     }
   }
+
+  /**
+   * Register a metadata index.
+   * Index definitions are stored in user-specified path or, by default, in 
.hoodie/.index_defs/index.json.
+   * For the first time, the index definition file will be created if not 
exists.
+   * For the second time, the index definition file will be updated if exists.
+   * Table Config is updated if necessary.
+   */
+  public static void register(HoodieTableMetaClient metaClient, 
HoodieIndexDefinition indexDefinition) {
+    LOG.info("Registering index {} of using {}", 
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+    // build HoodieIndexMetadata and then add to index definition file
+    boolean indexDefnUpdated = 
metaClient.buildIndexDefinition(indexDefinition);
+    if (indexDefnUpdated) {
+      String indexMetaPath = metaClient.getIndexDefinitionPath();
+      // update table config if necessary
+      if 
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+          || 
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
+        
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
 FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(indexMetaPath)));
+        HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
+      }
+    }
+  }
+
+  public static HoodieIndexDefinition 
getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient, 
String userIndexName, String indexType, Map<String, Map<String, String>> 
columns,
+                                                                              
Map<String, String> options, Map<String, String> tableProperties) throws 
Exception {
+    String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
+        ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
+        : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+    if (indexExists(metaClient, fullIndexName)) {
+      throw new HoodieMetadataIndexException("Index already exists: " + 
userIndexName);
+    }
+    checkArgument(columns.size() == 1, "Only one column can be indexed for 
functional or secondary index.");
+
+    if (!isEligibleForIndexing(metaClient, indexType, tableProperties, 
columns)) {
+      throw new HoodieMetadataIndexException("Not eligible for indexing: " + 
indexType + ", indexName: " + userIndexName);
+    }
+
+    return HoodieIndexDefinition.newBuilder()
+        .withIndexName(fullIndexName)
+        .withIndexType(indexType)
+        .withIndexFunction(options.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_TRANSFORM))
+        .withSourceFields(new ArrayList<>(columns.keySet()))
+        .withIndexOptions(options)
+        .build();
+  }
+
+  public static boolean indexExists(HoodieTableMetaClient metaClient, String 
indexName) {
+    return 
metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition 
-> partition.equals(indexName));
+  }
+
+  private static boolean isEligibleForIndexing(HoodieTableMetaClient 
metaClient, String indexType, Map<String, String> options, Map<String, 
Map<String, String>> columns) throws Exception {
+    if (!validateDataTypeForSecondaryIndex(new ArrayList<>(columns.keySet()), 
new TableSchemaResolver(metaClient).getTableAvroSchema())) {
+      return false;
+    }
+    // for secondary index, record index is a must
+    if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
+      // either record index is enabled or record index partition is already 
present
+      return 
metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition 
-> partition.equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()))

Review Comment:
   why can't we move data type validation in L527 to here (within this if block)



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -220,11 +220,6 @@ public String getIndexDefinitionPath() {
   public boolean buildIndexDefinition(HoodieIndexDefinition indexDefinition) {
     String indexName = indexDefinition.getIndexName();
     boolean isIndexDefnImmutable = 
!indexDefinition.getIndexName().equals(PARTITION_NAME_COLUMN_STATS); // only 
col stats is mutable.
-    if (isIndexDefnImmutable) {

Review Comment:
   looks like we don't even need L 222. can we move it then



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1050,6 +1037,7 @@ private Set<String> getMetadataPartitionsToUpdate() {
   }
 
   public void buildMetadataPartitions(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws 
IOException {
+    // TODO: so we need to create an index definition in case we're coming via 
the indexer

Review Comment:
   can you link follow up jira id



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1524,4 +1524,79 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
     // deleted record should still show in time travel view
     assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
   }
+
+  /**
+   * Test Secondary Index creation through datasource and metadata write 
configs.
+   *
+   * 1. Insert a batch of data with record_index enabled.
+   * 2. Upsert another batch of data with secondary index configs.
+   * 3. Validate that secondary index is created.
+   */
+  @Test
+  def testSecondaryIndexCreation(): Unit = {
+    var (writeOpts, readOpts) = getWriterReaderOpts()
+    writeOpts = writeOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+      HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+      HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+    )
+    readOpts = readOpts ++ Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+    )
+    initMetaClient(HoodieTableType.MERGE_ON_READ)
+    // Create a MOR table and add 10 records to the table.
+    val records = recordsToStrings(dataGen.generateInserts("000", 
3)).asScala.toSeq
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("org.apache.hudi")
+      .options(writeOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshotDF = 
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+    assertEquals(3, snapshotDF.count())
+
+    // Upsert another batch with secondary index configs
+    val secondaryIndexName = "idx_rider"
+    val secondaryIndexColumn = "rider"
+    writeOpts = writeOpts ++ Map(
+      HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true",
+      HoodieMetadataConfig.SECONDARY_INDEX_NAME.key -> secondaryIndexName,
+      HoodieMetadataConfig.SECONDARY_INDEX_COLUMN.key -> secondaryIndexColumn
+    )
+    val records2 = recordsToStrings(dataGen.generateInserts("001", 
3)).asScala.toSeq
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 
10))
+    inputDF2.write.format("org.apache.hudi")
+      .options(writeOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // validate that secondary index is created
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertTrue(metadataPartitionExists(basePath, context, 
PARTITION_NAME_SECONDARY_INDEX_PREFIX + secondaryIndexName))
+    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_NAME_SECONDARY_INDEX_PREFIX
 + secondaryIndexName))

Review Comment:
   whats the expectation for subsequent writes to the hudi table?
   do we expect users to set all sec index related configs for all subsequent 
writes too? 
   since in your other patch, we are introducing explicit config for mdt 
partitions to delete, may be,user can ignore these sec index related configs in 
subsequent writes and hudi will still continue to write/populate SI partition. 
   whats your take? 
   
   if yes, can we enhance the test and validate that? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java:
##########
@@ -133,6 +133,7 @@ public Option<HoodieIndexPlan> execute() {
 
   private HoodieIndexPartitionInfo 
buildIndexPartitionInfo(MetadataPartitionType partitionType, HoodieInstant 
indexUptoInstant) {
     // for expression index, we need to pass the index name as the partition 
name
+    // TODO: see the index partition info is built correctly. Should we 
register the index here?

Review Comment:
   can we remove the comment then?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -471,6 +471,59 @@ public static MetadataPartitionType 
fromPartitionPath(String partitionPath) {
     throw new IllegalArgumentException("No MetadataPartitionType for partition 
path: " + partitionPath);
   }
 
+  /**
+   * Given metadata config and table config, determine whether a new secondary 
index definition is required.
+   */
+  public static boolean 
isNewSecondaryIndexDefinitionRequired(HoodieMetadataConfig metadataConfig, 
HoodieTableMetaClient dataMetaClient) {
+    String secondaryIndexColumn = metadataConfig.getSecondaryIndexColumn();
+    if (StringUtils.isNullOrEmpty(secondaryIndexColumn)) {
+      return false;
+    }
+    // check the index definition already exists or not for this column
+    List<HoodieIndexDefinition> indexDefinitions = 
getIndexDefinitions(secondaryIndexColumn, PARTITION_NAME_SECONDARY_INDEX, 
dataMetaClient);
+    return indexDefinitions.isEmpty();
+  }
+
+  /**
+   * Given metadata config and table config, determine whether a new 
expression index definition is required.
+   */
+  public static boolean 
isNewExpressionIndexDefinitionRequired(HoodieMetadataConfig metadataConfig, 
HoodieTableMetaClient dataMetaClient) {
+    String expressionIndexColumn = metadataConfig.getExpressionIndexColumn();
+    if (StringUtils.isNullOrEmpty(expressionIndexColumn)) {
+      return false;
+    }
+
+    // check that expr is present in index options
+    Map<String, String> expressionIndexOptions = 
metadataConfig.getExpressionIndexOptions();
+    if (expressionIndexOptions.isEmpty()) {
+      return false;
+    }
+
+    // get all index definitions for this column and index type
+    // check if none of the index definitions has index function matching the 
expression
+    List<HoodieIndexDefinition> indexDefinitions = 
getIndexDefinitions(expressionIndexColumn, PARTITION_NAME_EXPRESSION_INDEX, 
dataMetaClient);
+    return indexDefinitions.isEmpty()
+        || indexDefinitions.stream().noneMatch(indexDefinition -> 
indexDefinition.getIndexFunction().equals(expressionIndexOptions.get(HoodieExpressionIndex.EXPRESSION_OPTION)));

Review Comment:
   can't we override equality func for HoodieIndexDefinition. would simplify 
these checks right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to