codope commented on code in PR #12349:
URL: https://github.com/apache/hudi/pull/12349#discussion_r1861159639
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -332,13 +332,13 @@ public String
getIndexNameWithoutPrefix(HoodieIndexDefinition indexDefinition) {
*/
public static boolean isGenericIndex(String metadataPartitionPath) {
Review Comment:
Let's rename this method and below one of `isGeneric*` to
`isExpressionOrSecondary*`
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java:
##########
@@ -86,9 +92,45 @@ public static HoodieSparkIndexClient
getInstance(SparkSession sparkSession) {
@Override
public void create(HoodieTableMetaClient metaClient, String userIndexName,
String indexType, Map<String, Map<String, String>> columns, Map<String, String>
options) throws Exception {
+ if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ||
indexType.equals(PARTITION_NAME_BLOOM_FILTERS)
+ || indexType.equals(PARTITION_NAME_COLUMN_STATS)) {
+ createGenericIndex(metaClient, userIndexName, indexType, columns,
options);
+ } else {
+ createRecordIndex(metaClient, userIndexName, indexType);
+ }
+ }
+
+ private void createRecordIndex(HoodieTableMetaClient metaClient, String
userIndexName, String indexType) {
Review Comment:
where are we validting that the column(s) provided in the create index
command are all record key fields?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -562,18 +562,18 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeFunctionalIndexPartiti
});
});
- int fileGroupCount =
dataWriteConfig.getMetadataConfig().getFunctionalIndexFileGroupCount();
- int parallelism = Math.min(partitionFilePathSizeTriplet.size(),
dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
- Schema readerSchema =
getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient);
- return Pair.of(fileGroupCount,
getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf, instantTime));
+ int fileGroupCount =
dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+ int parallelism = Math.min(partitionFilePathSizeTriplet.size(),
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
+ Schema readerSchema =
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
+ return Pair.of(fileGroupCount,
getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf, instantTime));
}
- private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName)
{
- Option<HoodieIndexMetadata> functionalIndexMetadata =
dataMetaClient.getIndexMetadata();
- if (functionalIndexMetadata.isPresent()) {
- return
functionalIndexMetadata.get().getIndexDefinitions().get(indexName);
+ private HoodieIndexDefinition getExpressionIndexDefinition(String indexName)
{
Review Comment:
should we just rename it to `getIndexDefinition` as it is used for sec index
too?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null &&
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne
RecordMergeMode.COMMIT_TIME_ORDERING)) {
- throw new HoodieIOException("Secondary Index can only be enabled on
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set
to OVERWRITE_WITH_LATEST")
+ throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- val extraOpts = options ++ table.properties
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+ extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ throw new HoodieIndexException("Currently Column stats Index can only
be created with a non identity expression")
+ }
Review Comment:
If users simply do `create index index_name on table_name using
column_stats(ts)`, do we get `EXPRESSION_OPTION` then?
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java:
##########
@@ -86,9 +92,45 @@ public static HoodieSparkIndexClient
getInstance(SparkSession sparkSession) {
@Override
public void create(HoodieTableMetaClient metaClient, String userIndexName,
String indexType, Map<String, Map<String, String>> columns, Map<String, String>
options) throws Exception {
+ if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ||
indexType.equals(PARTITION_NAME_BLOOM_FILTERS)
+ || indexType.equals(PARTITION_NAME_COLUMN_STATS)) {
+ createGenericIndex(metaClient, userIndexName, indexType, columns,
options);
+ } else {
+ createRecordIndex(metaClient, userIndexName, indexType);
Review Comment:
Can we mock and add a UT for this client just to validate the conditional
logics?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null &&
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne
RecordMergeMode.COMMIT_TIME_ORDERING)) {
- throw new HoodieIOException("Secondary Index can only be enabled on
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set
to OVERWRITE_WITH_LATEST")
+ throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- val extraOpts = options ++ table.properties
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+ extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ throw new HoodieIndexException("Currently Column stats Index can only
be created with a non identity expression")
+ }
+ HoodieSparkIndexClient.getInstance(sparkSession).create(
+ metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+ } else if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) ||
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {
+
ValidationUtils.checkArgument(matchesRecordKeys(columnsMap.keySet().asScala.toSet,
metaClient.getTableConfig),
+ "Input columns should match configured record key columns: " +
metaClient.getTableConfig.getRecordKeyFieldProp)
HoodieSparkIndexClient.getInstance(sparkSession).create(
metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+ } else if (StringUtils.isNullOrEmpty(indexType)) {
+ val columnNames = columnsMap.keySet().asScala.toSet
+ val derivedIndexType: String = if (matchesRecordKeys(columnNames,
metaClient.getTableConfig)) {
+ HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX
+ } else if (columnNames.size == 1) {
+ HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
+ } else {
+ throw new HoodieIndexException("Can not create secondary index on more
than one column at a time")
+ }
+ HoodieSparkIndexClient.getInstance(sparkSession).create(
+ metaClient, indexName, derivedIndexType, columnsMap, extraOpts.asJava)
} else {
- throw new HoodieIndexException(String.format("%s is not supported",
indexType));
+ throw new HoodieIndexException(String.format("%s is not supported",
indexType))
}
// Invalidate cached table for queries do not access related table
// through {@code DefaultSource}
sparkSession.sessionState.catalog.invalidateCachedTable(tableId)
Seq.empty
}
+
+ def matchesRecordKeys(columnNames: Set[String], tableConfig:
HoodieTableConfig): Boolean = {
Review Comment:
ok I see this method validates the record key fields. Let's UT this one.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null &&
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne
RecordMergeMode.COMMIT_TIME_ORDERING)) {
- throw new HoodieIOException("Secondary Index can only be enabled on
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set
to OVERWRITE_WITH_LATEST")
+ throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- val extraOpts = options ++ table.properties
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+ extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ throw new HoodieIndexException("Currently Column stats Index can only
be created with a non identity expression")
+ }
+ HoodieSparkIndexClient.getInstance(sparkSession).create(
+ metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+ } else if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) ||
indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {
Review Comment:
When can indexType be equal to RECORD_INDEX?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -332,13 +332,13 @@ public String
getIndexNameWithoutPrefix(HoodieIndexDefinition indexDefinition) {
*/
public static boolean isGenericIndex(String metadataPartitionPath) {
return metadataPartitionPath.startsWith(SECONDARY_INDEX.getPartitionPath())
- ||
metadataPartitionPath.startsWith(FUNCTIONAL_INDEX.getPartitionPath());
+ ||
metadataPartitionPath.startsWith(EXPRESSION_INDEX.getPartitionPath());
Review Comment:
can we reuse `MetadataPartitionType.fromParttionPath`?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala:
##########
@@ -92,7 +92,15 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
Seq("record_index", "record_index", "")
)
- spark.sql(s"create index idx_name on $tableName using
secondary_index(name)")
+ // Secondary index can not be created for two columns at once
+ checkException(s"create index idx_name_price on $tableName using
secondary_index(name,price)")(
Review Comment:
Can we add some positive scenario for all indexes and validate that e2e from
creation to udpate to drop works all good?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -473,9 +473,9 @@ case class HoodieFileIndex(spark: SparkSession,
private def validateConfig(): Unit = {
if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isIndexEnabled))
{
- logWarning("Data skipping requires both Metadata Table and at least one
of Column Stats Index, Record Level Index, or Functional Index" +
+ logWarning("Data skipping requires both Metadata Table and at least one
of Column Stats Index, Record Level Index, or Expression Index" +
Review Comment:
Should we fix the log message and condition here? We don't necessarily need
colstats to be enabled on the reader side.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala:
##########
@@ -58,21 +61,60 @@ case class CreateIndexCommand(table: CatalogTable,
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null &&
!(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne
RecordMergeMode.COMMIT_TIME_ORDERING)) {
- throw new HoodieIOException("Secondary Index can only be enabled on
table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set
to OVERWRITE_WITH_LATEST")
+ throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- val extraOpts = options ++ table.properties
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
+ extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ throw new HoodieIndexException("Currently Column stats Index can only
be created with a non identity expression")
Review Comment:
Let's rephrase the error message: `Column stats index without expression on
any column can be created using datasource configs `<column stats enable
config>` and `<columns to index config>`. Point to the doc link in the message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]