nsivabalan commented on code in PR #12653:
URL: https://github.com/apache/hudi/pull/12653#discussion_r1928165401


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java:
##########
@@ -133,51 +130,43 @@ private void createRecordIndex(HoodieTableMetaClient 
metaClient, String userInde
 
   @Override
   public void createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient 
metaClient, List<String> columnsToIndex) {
-    HoodieIndexDefinition indexDefinition = new 
HoodieIndexDefinition(PARTITION_NAME_COLUMN_STATS, PARTITION_NAME_COLUMN_STATS, 
PARTITION_NAME_COLUMN_STATS,
-        columnsToIndex, Collections.EMPTY_MAP);
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName(PARTITION_NAME_COLUMN_STATS)
+        .withIndexType(PARTITION_NAME_COLUMN_STATS)
+        .withIndexFunction(PARTITION_NAME_COLUMN_STATS)
+        .withSourceFields(columnsToIndex)
+        .withIndexOptions(Collections.EMPTY_MAP)
+        .build();
     LOG.info("Registering Or Updating the index " + 
PARTITION_NAME_COLUMN_STATS);
     register(metaClient, indexDefinition);
   }
 
   private void createExpressionOrSecondaryIndex(HoodieTableMetaClient 
metaClient, String userIndexName, String indexType,
                                                 Map<String, Map<String, 
String>> columns, Map<String, String> options, Map<String, String> 
tableProperties) throws Exception {
-    String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
-        ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
-        : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
-    if (indexExists(metaClient, fullIndexName)) {
-      throw new HoodieMetadataIndexException("Index already exists: " + 
userIndexName);
-    }
-    checkArgument(columns.size() == 1, "Only one column can be indexed for 
functional or secondary index.");
-
-    if (!isEligibleForIndexing(metaClient, indexType, tableProperties, 
columns)) {
-      throw new HoodieMetadataIndexException("Not eligible for indexing: " + 
indexType + ", indexName: " + userIndexName);
-    }
-
-    HoodieIndexDefinition indexDefinition = new 
HoodieIndexDefinition(fullIndexName, indexType, 
options.getOrDefault(EXPRESSION_OPTION, IDENTITY_FUNCTION),
-        new ArrayList<>(columns.keySet()), options);
+    HoodieIndexDefinition indexDefinition = 
HoodieIndexUtils.getSecondaryOrExpressionIndexDefinition(metaClient, 
userIndexName, indexType, columns, options, tableProperties);
     if 
(!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()
         || !metaClient.getIndexMetadata().isPresent()
-        || 
!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(fullIndexName))
 {
+        || 
!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(indexDefinition.getIndexName()))
 {
       LOG.info("Index definition is not present. Registering the index first");
       register(metaClient, indexDefinition);
     }
 
     ValidationUtils.checkState(metaClient.getIndexMetadata().isPresent(), 
"Index definition is not present");
 
-    LOG.info("Creating index {} of using {}", fullIndexName, indexType);
+    LOG.info("Creating index {} of using {}", indexDefinition.getIndexName(), 
indexType);
     Option<HoodieIndexDefinition> expressionIndexDefinitionOpt = 
Option.ofNullable(indexDefinition);
     try (SparkRDDWriteClient writeClient = getWriteClient(metaClient, 
expressionIndexDefinitionOpt, Option.of(indexType))) {
       MetadataPartitionType partitionType = 
indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? 
MetadataPartitionType.SECONDARY_INDEX : MetadataPartitionType.EXPRESSION_INDEX;
       // generate index plan
-      Option<String> indexInstantTime = doSchedule(writeClient, metaClient, 
fullIndexName, partitionType);
+      Option<String> indexInstantTime = doSchedule(writeClient, metaClient, 
indexDefinition.getIndexName(), partitionType);
       if (indexInstantTime.isPresent()) {
         // build index
         writeClient.index(indexInstantTime.get());
       } else {
         throw new HoodieMetadataIndexException("Scheduling of index action did 
not return any instant.");
       }
     } catch (Throwable t) {
-      drop(metaClient, fullIndexName, Option.ofNullable(indexDefinition));
+      drop(metaClient, indexDefinition.getIndexName(), 
Option.ofNullable(indexDefinition));

Review Comment:
   can we add a warn log here that indexing failed and so we are dropping the 
index. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -211,38 +210,28 @@ public String getIndexDefinitionPath() {
   }
 
   /**
-   * Builds expression index definition and writes to index definition file.
-   * Support mutable and immutable index definition. Only col stats is 
mutable, while all others are immutable.
-   * Inacse of immutable index definition, we could only create or delete the 
definition.
-   * Incase of mutable (col stats), list of source columns (or list of columns 
to index) could also change.
+   * Builds index definition and writes to index definition file. Support 
mutable and immutable index definition.
+   * For instance, if index definition is mutable (like column stats), list of 
source columns (or list of columns to index) could also change.
+   * If an index definition is present for the index name, it will be updated 
only when there is difference between present and new index definition.
+   *
    * @return true if index definition is updated.
    */
   public boolean buildIndexDefinition(HoodieIndexDefinition indexDefinition) {
     String indexName = indexDefinition.getIndexName();
-    boolean isIndexDefnImmutable = 
!indexDefinition.getIndexName().equals(PARTITION_NAME_COLUMN_STATS); // only 
col stats is mutable.
-    if (isIndexDefnImmutable) {
-      checkState(
-          !indexMetadataOpt.isPresent() || 
(!indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName)),
-          "Index metadata is already present");
-    }
     String indexMetaPath = getIndexDefinitionPath();
     boolean updateIndexDefn = true;
     if (indexMetadataOpt.isPresent()) {
-      if (isIndexDefnImmutable) {
-        indexMetadataOpt.get().getIndexDefinitions().put(indexName, 
indexDefinition);
-      } else {
-        // if index defn is mutable, lets check for difference and only update 
if required.
-        if 
(indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName)) {
-          if 
(!indexMetadataOpt.get().getIndexDefinitions().get(indexName).getSourceFields().equals(indexDefinition.getSourceFields()))
 {
-            LOG.info(String.format("List of columns to index is changing. Old 
value %s. New value %s",
-                
indexMetadataOpt.get().getIndexDefinitions().get(indexName).getSourceFields(), 
indexDefinition.getSourceFields()));
-            indexMetadataOpt.get().getIndexDefinitions().put(indexName, 
indexDefinition);
-          } else {
-            updateIndexDefn = false;
-          }
-        } else {
+      // if index definition is present, lets check for difference and only 
update if required.
+      if (indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName)) 
{
+        if 
(!indexMetadataOpt.get().getIndexDefinitions().get(indexName).getSourceFields().equals(indexDefinition.getSourceFields()))
 {

Review Comment:
   for sec and expr index, just comparing source fields is good enough is it? 
don't we need to compare other attributes like index type, index options etc?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,69 @@ public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> ol
         throw new HoodieIndexException("Unsupported record type: " + 
recordType);
     }
   }
+
+  /**
+   * Register a metadata index.
+   * Index definitions are stored in user-specified path or, by default, in 
.hoodie/.index_defs/index.json.
+   * For the first time, the index definition file will be created if not 
exists.
+   * For the second time, the index definition file will be updated if exists.
+   * Table Config is updated if necessary.
+   */
+  public static void register(HoodieTableMetaClient metaClient, 
HoodieIndexDefinition indexDefinition) {
+    LOG.info("Registering index {} of using {}", 
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+    // build HoodieIndexMetadata and then add to index definition file
+    boolean indexDefnUpdated = 
metaClient.buildIndexDefinition(indexDefinition);
+    if (indexDefnUpdated) {
+      String indexMetaPath = metaClient.getIndexDefinitionPath();
+      // update table config if necessary
+      if 
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+          || 
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
+        
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
 FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(indexMetaPath)));
+        HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
+      }
+    }
+  }
+
+  public static HoodieIndexDefinition 
getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient, 
String userIndexName, String indexType, Map<String, Map<String, String>> 
columns,
+                                                                              
Map<String, String> options, Map<String, String> tableProperties) throws 
Exception {
+    String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
+        ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
+        : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+    if (indexExists(metaClient, fullIndexName)) {
+      throw new HoodieMetadataIndexException("Index already exists: " + 
userIndexName);
+    }
+    checkArgument(columns.size() == 1, "Only one column can be indexed for 
functional or secondary index.");
+
+    if (!isEligibleForSecondaryOrExpressionIndex(metaClient, indexType, 
tableProperties, columns)) {
+      throw new HoodieMetadataIndexException("Not eligible for indexing: " + 
indexType + ", indexName: " + userIndexName);
+    }
+
+    return HoodieIndexDefinition.newBuilder()
+        .withIndexName(fullIndexName)
+        .withIndexType(indexType)
+        .withIndexFunction(options.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_TRANSFORM))
+        .withSourceFields(new ArrayList<>(columns.keySet()))
+        .withIndexOptions(options)
+        .build();
+  }
+
+  public static boolean indexExists(HoodieTableMetaClient metaClient, String 
indexName) {

Review Comment:
   why public? 
   if we need it for testing, can you add the VisibleForAnnotation 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to