lokeshj1703 commented on code in PR #12349:
URL: https://github.com/apache/hudi/pull/12349#discussion_r1861798764


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
         if ((metaClient.getTableConfig.getPayloadClass != null && 
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
           || (metaClient.getTableConfig.getRecordMergeMode ne 
RecordMergeMode.COMMIT_TIME_ORDERING)) {
-          throw new HoodieIOException("Secondary Index can only be enabled on 
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set 
to OVERWRITE_WITH_LATEST")
+          throw new HoodieIndexException("Secondary Index can only be enabled 
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode 
set to OVERWRITE_WITH_LATEST")
         }
       }
-      val extraOpts = options ++ table.properties
+      if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+        extraOpts.asJava.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+        throw new HoodieIndexException("Currently Column stats Index can only 
be created with a non identity expression")
+      }
+      HoodieSparkIndexClient.getInstance(sparkSession).create(
+        metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+    } else if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) || 
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {
+      
ValidationUtils.checkArgument(matchesRecordKeys(columnsMap.keySet().asScala.toSet,
 metaClient.getTableConfig),
+        "Input columns should match configured record key columns: " + 
metaClient.getTableConfig.getRecordKeyFieldProp)
       HoodieSparkIndexClient.getInstance(sparkSession).create(
         metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+    } else if (StringUtils.isNullOrEmpty(indexType)) {
+      val columnNames = columnsMap.keySet().asScala.toSet
+      val derivedIndexType: String = if (matchesRecordKeys(columnNames, 
metaClient.getTableConfig)) {
+        HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX
+      } else if (columnNames.size == 1) {
+        HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
+      } else {
+        throw new HoodieIndexException("Can not create secondary index on more 
than one column at a time")
+      }
+      HoodieSparkIndexClient.getInstance(sparkSession).create(
+        metaClient, indexName, derivedIndexType, columnsMap, extraOpts.asJava)
     } else {
-      throw new HoodieIndexException(String.format("%s is not supported", 
indexType));
+      throw new HoodieIndexException(String.format("%s is not supported", 
indexType))
     }
 
     // Invalidate cached table for queries do not access related table
     // through {@code DefaultSource}
     sparkSession.sessionState.catalog.invalidateCachedTable(tableId)
     Seq.empty
   }
+
+  def matchesRecordKeys(columnNames: Set[String], tableConfig: 
HoodieTableConfig): Boolean = {

Review Comment:
   Addressed



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
         if ((metaClient.getTableConfig.getPayloadClass != null && 
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
           || (metaClient.getTableConfig.getRecordMergeMode ne 
RecordMergeMode.COMMIT_TIME_ORDERING)) {
-          throw new HoodieIOException("Secondary Index can only be enabled on 
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set 
to OVERWRITE_WITH_LATEST")
+          throw new HoodieIndexException("Secondary Index can only be enabled 
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode 
set to OVERWRITE_WITH_LATEST")
         }
       }
-      val extraOpts = options ++ table.properties
+      if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+        extraOpts.asJava.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+        throw new HoodieIndexException("Currently Column stats Index can only 
be created with a non identity expression")
+      }
+      HoodieSparkIndexClient.getInstance(sparkSession).create(
+        metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+    } else if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) || 
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {

Review Comment:
   That case is removed now



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
       if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
         if ((metaClient.getTableConfig.getPayloadClass != null && 
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
           || (metaClient.getTableConfig.getRecordMergeMode ne 
RecordMergeMode.COMMIT_TIME_ORDERING)) {
-          throw new HoodieIOException("Secondary Index can only be enabled on 
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set 
to OVERWRITE_WITH_LATEST")
+          throw new HoodieIndexException("Secondary Index can only be enabled 
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode 
set to OVERWRITE_WITH_LATEST")
         }
       }
-      val extraOpts = options ++ table.properties
+      if 
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+        extraOpts.asJava.getOrDefault(EXPRESSION_OPTION, 
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+        throw new HoodieIndexException("Currently Column stats Index can only 
be created with a non identity expression")

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to