codope commented on code in PR #12349:
URL: https://github.com/apache/hudi/pull/12349#discussion_r1861159639


##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -332,13 +332,13 @@ public String 
getIndexNameWithoutPrefix(HoodieIndexDefinition indexDefinition) {
    */
   public static boolean isGenericIndex(String metadataPartitionPath) {

Review Comment:
   Let's rename this method and below one of `isGeneric*` to 
`isExpressionOrSecondary*`



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java:
##########
@@ -86,9 +92,45 @@ public static HoodieSparkIndexClient 
getInstance(SparkSession sparkSession) {
 
   @Override
   public void create(HoodieTableMetaClient metaClient, String userIndexName, 
String indexType, Map<String, Map<String, String>> columns, Map<String, String> 
options) throws Exception {
+    if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX) || 
indexType.equals(PARTITION_NAME_BLOOM_FILTERS)
+        || indexType.equals(PARTITION_NAME_COLUMN_STATS)) {
+      createGenericIndex(metaClient, userIndexName, indexType, columns, 
options);
+    } else {
+      createRecordIndex(metaClient, userIndexName, indexType);
+    }
+  }
+
+  private void createRecordIndex(HoodieTableMetaClient metaClient, String 
userIndexName, String indexType) {

Review Comment:
   where are we validting that the column(s) provided in the create index 
command are all record key fields?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -562,18 +562,18 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFunctionalIndexPartiti
       });
     });
 
-    int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getFunctionalIndexFileGroupCount();
-    int parallelism = Math.min(partitionFilePathSizeTriplet.size(), 
dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
-    Schema readerSchema = 
getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient);
-    return Pair.of(fileGroupCount, 
getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf, instantTime));
+    int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    int parallelism = Math.min(partitionFilePathSizeTriplet.size(), 
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
+    Schema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
+    return Pair.of(fileGroupCount, 
getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf, instantTime));
   }
 
-  private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName) 
{
-    Option<HoodieIndexMetadata> functionalIndexMetadata = 
dataMetaClient.getIndexMetadata();
-    if (functionalIndexMetadata.isPresent()) {
-      return 
functionalIndexMetadata.get().getIndexDefinitions().get(indexName);
+  private HoodieIndexDefinition getExpressionIndexDefinition(String indexName) 
{

Review Comment:
   should we just rename it to `getIndexDefinition` as it is used for sec index 
too?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
         if ((metaClient.getTableConfig.getPayloadClass != null && 
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
           || (metaClient.getTableConfig.getRecordMergeMode ne 
RecordMergeMode.COMMIT_TIME_ORDERING)) {
-          throw new HoodieIOException("Secondary Index can only be enabled on 
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set 
to OVERWRITE_WITH_LATEST")
+          throw new HoodieIndexException("Secondary Index can only be enabled 
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode 
set to OVERWRITE_WITH_LATEST")
         }
       }
-      val extraOpts = options ++ table.properties
+      if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+        extraOpts.asJava.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+        throw new HoodieIndexException("Currently Column stats Index can only 
be created with a non identity expression")
+      }

Review Comment:
   If users simply do `create index index_name on table_name using 
column_stats(ts)`, do we get `EXPRESSION_OPTION` then?



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java:
##########
@@ -86,9 +92,45 @@ public static HoodieSparkIndexClient 
getInstance(SparkSession sparkSession) {
 
   @Override
   public void create(HoodieTableMetaClient metaClient, String userIndexName, 
String indexType, Map<String, Map<String, String>> columns, Map<String, String> 
options) throws Exception {
+    if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX) || 
indexType.equals(PARTITION_NAME_BLOOM_FILTERS)
+        || indexType.equals(PARTITION_NAME_COLUMN_STATS)) {
+      createGenericIndex(metaClient, userIndexName, indexType, columns, 
options);
+    } else {
+      createRecordIndex(metaClient, userIndexName, indexType);

Review Comment:
   Can we mock and add a UT for this client just to validate the conditional 
logics?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
         if ((metaClient.getTableConfig.getPayloadClass != null && 
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
           || (metaClient.getTableConfig.getRecordMergeMode ne 
RecordMergeMode.COMMIT_TIME_ORDERING)) {
-          throw new HoodieIOException("Secondary Index can only be enabled on 
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set 
to OVERWRITE_WITH_LATEST")
+          throw new HoodieIndexException("Secondary Index can only be enabled 
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode 
set to OVERWRITE_WITH_LATEST")
         }
       }
-      val extraOpts = options ++ table.properties
+      if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+        extraOpts.asJava.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+        throw new HoodieIndexException("Currently Column stats Index can only 
be created with a non identity expression")
+      }
+      HoodieSparkIndexClient.getInstance(sparkSession).create(
+        metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+    } else if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) || 
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {
+      
ValidationUtils.checkArgument(matchesRecordKeys(columnsMap.keySet().asScala.toSet,
 metaClient.getTableConfig),
+        "Input columns should match configured record key columns: " + 
metaClient.getTableConfig.getRecordKeyFieldProp)
       HoodieSparkIndexClient.getInstance(sparkSession).create(
         metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+    } else if (StringUtils.isNullOrEmpty(indexType)) {
+      val columnNames = columnsMap.keySet().asScala.toSet
+      val derivedIndexType: String = if (matchesRecordKeys(columnNames, 
metaClient.getTableConfig)) {
+        HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX
+      } else if (columnNames.size == 1) {
+        HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
+      } else {
+        throw new HoodieIndexException("Can not create secondary index on more 
than one column at a time")
+      }
+      HoodieSparkIndexClient.getInstance(sparkSession).create(
+        metaClient, indexName, derivedIndexType, columnsMap, extraOpts.asJava)
     } else {
-      throw new HoodieIndexException(String.format("%s is not supported", 
indexType));
+      throw new HoodieIndexException(String.format("%s is not supported", 
indexType))
     }
 
     // Invalidate cached table for queries do not access related table
     // through {@code DefaultSource}
     sparkSession.sessionState.catalog.invalidateCachedTable(tableId)
     Seq.empty
   }
+
+  def matchesRecordKeys(columnNames: Set[String], tableConfig: 
HoodieTableConfig): Boolean = {

Review Comment:
   ok I see this method validates the record key fields. Let's UT this one.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
         if ((metaClient.getTableConfig.getPayloadClass != null && 
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
           || (metaClient.getTableConfig.getRecordMergeMode ne 
RecordMergeMode.COMMIT_TIME_ORDERING)) {
-          throw new HoodieIOException("Secondary Index can only be enabled on 
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set 
to OVERWRITE_WITH_LATEST")
+          throw new HoodieIndexException("Secondary Index can only be enabled 
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode 
set to OVERWRITE_WITH_LATEST")
         }
       }
-      val extraOpts = options ++ table.properties
+      if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+        extraOpts.asJava.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+        throw new HoodieIndexException("Currently Column stats Index can only 
be created with a non identity expression")
+      }
+      HoodieSparkIndexClient.getInstance(sparkSession).create(
+        metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+    } else if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) || 
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {

Review Comment:
   When can indexType be equal to RECORD_INDEX?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -332,13 +332,13 @@ public String 
getIndexNameWithoutPrefix(HoodieIndexDefinition indexDefinition) {
    */
   public static boolean isGenericIndex(String metadataPartitionPath) {
     return metadataPartitionPath.startsWith(SECONDARY_INDEX.getPartitionPath())
-        || 
metadataPartitionPath.startsWith(FUNCTIONAL_INDEX.getPartitionPath());
+        || 
metadataPartitionPath.startsWith(EXPRESSION_INDEX.getPartitionPath());

Review Comment:
   can we reuse `MetadataPartitionType.fromParttionPath`?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala:
##########
@@ -92,7 +92,15 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
             Seq("record_index", "record_index", "")
           )
 
-          spark.sql(s"create index idx_name on $tableName using 
secondary_index(name)")
+          // Secondary index can not be created for two columns at once
+          checkException(s"create index idx_name_price on $tableName using 
secondary_index(name,price)")(

Review Comment:
   Can we add some positive scenario for all indexes and validate that e2e from 
 creation to udpate to drop works all good?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -473,9 +473,9 @@ case class HoodieFileIndex(spark: SparkSession,
 
   private def validateConfig(): Unit = {
     if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isIndexEnabled)) 
{
-      logWarning("Data skipping requires both Metadata Table and at least one 
of Column Stats Index, Record Level Index, or Functional Index" +
+      logWarning("Data skipping requires both Metadata Table and at least one 
of Column Stats Index, Record Level Index, or Expression Index" +

Review Comment:
   Should we fix the log message and condition here? We don't necessarily need 
colstats to be enabled on the reader side.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
         if ((metaClient.getTableConfig.getPayloadClass != null && 
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
           || (metaClient.getTableConfig.getRecordMergeMode ne 
RecordMergeMode.COMMIT_TIME_ORDERING)) {
-          throw new HoodieIOException("Secondary Index can only be enabled on 
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set 
to OVERWRITE_WITH_LATEST")
+          throw new HoodieIndexException("Secondary Index can only be enabled 
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode 
set to OVERWRITE_WITH_LATEST")
         }
       }
-      val extraOpts = options ++ table.properties
+      if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+        extraOpts.asJava.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+        throw new HoodieIndexException("Currently Column stats Index can only 
be created with a non identity expression")

Review Comment:
   Let's rephrase the error message: `Column stats index without expression on 
any column can be created using datasource configs `<column stats enable 
config>` and `<columns to index config>`. Point to the doc link in the message. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to