lokeshj1703 commented on code in PR #12349:
URL: https://github.com/apache/hudi/pull/12349#discussion_r1861798764
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null &&
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne
RecordMergeMode.COMMIT_TIME_ORDERING)) {
- throw new HoodieIOException("Secondary Index can only be enabled on
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set
to OVERWRITE_WITH_LATEST")
+ throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- val extraOpts = options ++ table.properties
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+ extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ throw new HoodieIndexException("Currently Column stats Index can only
be created with a non identity expression")
+ }
+ HoodieSparkIndexClient.getInstance(sparkSession).create(
+ metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+ } else if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) ||
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {
+
ValidationUtils.checkArgument(matchesRecordKeys(columnsMap.keySet().asScala.toSet,
metaClient.getTableConfig),
+ "Input columns should match configured record key columns: " +
metaClient.getTableConfig.getRecordKeyFieldProp)
HoodieSparkIndexClient.getInstance(sparkSession).create(
metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+ } else if (StringUtils.isNullOrEmpty(indexType)) {
+ val columnNames = columnsMap.keySet().asScala.toSet
+ val derivedIndexType: String = if (matchesRecordKeys(columnNames,
metaClient.getTableConfig)) {
+ HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX
+ } else if (columnNames.size == 1) {
+ HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
+ } else {
+ throw new HoodieIndexException("Can not create secondary index on more
than one column at a time")
+ }
+ HoodieSparkIndexClient.getInstance(sparkSession).create(
+ metaClient, indexName, derivedIndexType, columnsMap, extraOpts.asJava)
} else {
- throw new HoodieIndexException(String.format("%s is not supported",
indexType));
+ throw new HoodieIndexException(String.format("%s is not supported",
indexType))
}
// Invalidate cached table for queries do not access related table
// through {@code DefaultSource}
sparkSession.sessionState.catalog.invalidateCachedTable(tableId)
Seq.empty
}
+
+ def matchesRecordKeys(columnNames: Set[String], tableConfig:
HoodieTableConfig): Boolean = {
Review Comment:
Addressed
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null &&
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne
RecordMergeMode.COMMIT_TIME_ORDERING)) {
- throw new HoodieIOException("Secondary Index can only be enabled on
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set
to OVERWRITE_WITH_LATEST")
+ throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- val extraOpts = options ++ table.properties
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+ extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ throw new HoodieIndexException("Currently Column stats Index can only
be created with a non identity expression")
+ }
+ HoodieSparkIndexClient.getInstance(sparkSession).create(
+ metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+ } else if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) ||
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {
Review Comment:
That case is removed now
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null &&
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne
RecordMergeMode.COMMIT_TIME_ORDERING)) {
- throw new HoodieIOException("Secondary Index can only be enabled on
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set
to OVERWRITE_WITH_LATEST")
+ throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- val extraOpts = options ++ table.properties
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+ extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ throw new HoodieIndexException("Currently Column stats Index can only
be created with a non identity expression")
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]