lokeshj1703 commented on code in PR #12653:
URL: https://github.com/apache/hudi/pull/12653#discussion_r1920041372
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -427,7 +430,27 @@ private void initializeFromFilesystem(String
initializationTime, List<MetadataPa
case EXPRESSION_INDEX:
Set<String> expressionIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType);
if (expressionIndexPartitionsToInit.isEmpty()) {
- continue;
+ if
(isNewExpressionIndexDefinitionRequired(dataWriteConfig.getMetadataConfig(),
dataMetaClient)) {
+ String indexedColumn =
dataWriteConfig.getMetadataConfig().getExpressionIndexColumn();
+ String indexName =
dataWriteConfig.getMetadataConfig().getExpressionIndexName();
+ String indexType =
dataWriteConfig.getMetadataConfig().getExpressionIndexType();
+ // Use a default index name if the indexed column is not
specified
Review Comment:
NIT: `indexed column is -not- specified`
Remove not
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -445,6 +468,24 @@ private void initializeFromFilesystem(String
initializationTime, List<MetadataPa
break;
case SECONDARY_INDEX:
Set<String> secondaryIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType);
Review Comment:
Similarly can we move the major code logic to a separate function for
secondary index as well.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,61 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
throw new HoodieIndexException("Unsupported record type: " +
recordType);
}
}
+
+ /**
+ * Register a metadata index.
+ * Index definitions are stored in user-specified path or, by default, in
.hoodie/.index_defs/index.json.
+ * For the first time, the index definition file will be created if not
exists.
+ * For the second time, the index definition file will be updated if exists.
+ * Table Config is updated if necessary.
+ */
+ public static void register(HoodieTableMetaClient metaClient,
HoodieIndexDefinition indexDefinition) {
+ LOG.info("Registering index {} of using {}",
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+ // build HoodieIndexMetadata and then add to index definition file
+ boolean indexDefnUpdated =
metaClient.buildIndexDefinition(indexDefinition);
+ if (indexDefnUpdated) {
+ String indexMetaPath = metaClient.getIndexDefinitionPath();
+ // update table config if necessary
+ if
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+ ||
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
Review Comment:
Do we need the dual check here?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -471,6 +476,47 @@ public static MetadataPartitionType
fromPartitionPath(String partitionPath) {
throw new IllegalArgumentException("No MetadataPartitionType for partition
path: " + partitionPath);
}
+ /**
+ * Given metadata config and table config, determine whether a new secondary
index definition is required.
+ */
+ public static boolean
isNewSecondaryIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String secondaryIndexColumn = metadataConfig.getSecondaryIndexColumn();
+ if (StringUtils.isNullOrEmpty(secondaryIndexColumn)) {
+ return false;
+ }
+ // check the index definition already exists or not for this column
+ return dataMetaClient.getIndexMetadata().isEmpty() ||
!isIndexDefinitionPresentForColumn(secondaryIndexColumn,
PARTITION_NAME_SECONDARY_INDEX, dataMetaClient);
+ }
+
+ /**
+ * Given metadata config and table config, determine whether a new
expression index definition is required.
+ */
+ public static boolean
isNewExpressionIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String expressionIndexColumn = metadataConfig.getExpressionIndexColumn();
+ if (StringUtils.isNullOrEmpty(expressionIndexColumn)) {
+ return false;
+ }
+
+ // check that expr is present in index options
+ Map<String, String> expressionIndexOptions =
metadataConfig.getExpressionIndexOptions();
+ if (expressionIndexOptions.isEmpty()) {
Review Comment:
Options can be empty for identity expression index
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -352,6 +358,32 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating expression
index.");
+ public static final ConfigProperty<String> EXPRESSION_INDEX_COLUMN =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.expression.column")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.0.0")
Review Comment:
Since version can be 1.0.1 for all the new configs.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -471,6 +476,47 @@ public static MetadataPartitionType
fromPartitionPath(String partitionPath) {
throw new IllegalArgumentException("No MetadataPartitionType for partition
path: " + partitionPath);
}
+ /**
+ * Given metadata config and table config, determine whether a new secondary
index definition is required.
+ */
+ public static boolean
isNewSecondaryIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String secondaryIndexColumn = metadataConfig.getSecondaryIndexColumn();
+ if (StringUtils.isNullOrEmpty(secondaryIndexColumn)) {
+ return false;
+ }
+ // check the index definition already exists or not for this column
+ return dataMetaClient.getIndexMetadata().isEmpty() ||
!isIndexDefinitionPresentForColumn(secondaryIndexColumn,
PARTITION_NAME_SECONDARY_INDEX, dataMetaClient);
+ }
+
+ /**
+ * Given metadata config and table config, determine whether a new
expression index definition is required.
+ */
+ public static boolean
isNewExpressionIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String expressionIndexColumn = metadataConfig.getExpressionIndexColumn();
+ if (StringUtils.isNullOrEmpty(expressionIndexColumn)) {
+ return false;
+ }
+
+ // check that expr is present in index options
+ Map<String, String> expressionIndexOptions =
metadataConfig.getExpressionIndexOptions();
+ if (expressionIndexOptions.isEmpty()) {
+ return false;
+ }
+
+ String expression =
expressionIndexOptions.get(HoodieExpressionIndex.EXPRESSION_OPTION);
+ if (StringUtils.isNullOrEmpty(expression)) {
+ return false;
Review Comment:
It can be null or empty for identity expression
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -471,6 +476,47 @@ public static MetadataPartitionType
fromPartitionPath(String partitionPath) {
throw new IllegalArgumentException("No MetadataPartitionType for partition
path: " + partitionPath);
}
+ /**
+ * Given metadata config and table config, determine whether a new secondary
index definition is required.
+ */
+ public static boolean
isNewSecondaryIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
Review Comment:
We can add UTs for new functions in this class
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -352,6 +358,32 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating expression
index.");
+ public static final ConfigProperty<String> EXPRESSION_INDEX_COLUMN =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.expression.column")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Column for which expression index will be built.");
+ public static final ConfigProperty<String> EXPRESSION_INDEX_NAME =
ConfigProperty
Review Comment:
NIT: Space between the configs
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -126,6 +128,53 @@ public String getIndexType() {
return indexType;
}
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String indexName;
+ private String indexType;
+ private String indexFunction;
+ private List<String> sourceFields;
+ private Map<String, String> indexOptions;
+
+ public Builder() {
+ this.sourceFields = new ArrayList<>();
+ this.indexOptions = new HashMap<>();
+ }
+
+ public Builder withIndexName(String indexName) {
+ this.indexName = indexName;
+ return this;
+ }
+
+ public Builder withIndexType(String indexType) {
+ this.indexType = indexType;
+ return this;
+ }
+
+ public Builder withIndexFunction(String indexFunction) {
+ this.indexFunction = indexFunction;
+ return this;
+ }
+
+ public Builder withSourceFields(List<String> sourceFields) {
+ this.sourceFields = sourceFields;
+ return this;
+ }
+
+ public Builder withIndexOptions(Map<String, String> indexOptions) {
+ this.indexOptions = indexOptions;
+ return this;
+ }
+
+ public HoodieIndexDefinition build() {
Review Comment:
NIT: We can replace the callers of HoodieIndexDefinition constructor with
the build function here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -427,7 +430,27 @@ private void initializeFromFilesystem(String
initializationTime, List<MetadataPa
case EXPRESSION_INDEX:
Set<String> expressionIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType);
if (expressionIndexPartitionsToInit.isEmpty()) {
- continue;
+ if
(isNewExpressionIndexDefinitionRequired(dataWriteConfig.getMetadataConfig(),
dataMetaClient)) {
Review Comment:
Can we move this code block to a separate function for better readability?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -627,6 +668,7 @@ private Set<String>
getIndexPartitionsToInit(MetadataPartitionType partitionType
}
private Pair<Integer, HoodieData<HoodieRecord>>
initializeSecondaryIndexPartition(String indexName) throws IOException {
+ // TODO: does index definition already exist at this point, in case we're
coming via the indexer?
Review Comment:
Should we create a jira for this?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java:
##########
@@ -133,6 +133,7 @@ public Option<HoodieIndexPlan> execute() {
private HoodieIndexPartitionInfo
buildIndexPartitionInfo(MetadataPartitionType partitionType, HoodieInstant
indexUptoInstant) {
// for expression index, we need to pass the index name as the partition
name
+ // TODO: see the index partition info is built correctly. Should we
register the index here?
Review Comment:
Followup jira for the TODO?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1524,4 +1526,59 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
// deleted record should still show in time travel view
assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
}
+
+ /**
+ * Test Secondary Index creation through datasource and metadata write
configs.
+ *
+ * 1. Insert a batch of data with record_index enabled.
+ * 2. Upsert another batch of data with secondary index configs.
+ * 3. Validate that secondary index is created.
+ */
+ @Test
+ def testSecondaryIndexCreation(): Unit = {
Review Comment:
NIT: Should we move the test to the existing Secondary index test classes?
It will be easier to find the tests then.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -471,6 +476,47 @@ public static MetadataPartitionType
fromPartitionPath(String partitionPath) {
throw new IllegalArgumentException("No MetadataPartitionType for partition
path: " + partitionPath);
}
+ /**
+ * Given metadata config and table config, determine whether a new secondary
index definition is required.
+ */
+ public static boolean
isNewSecondaryIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String secondaryIndexColumn = metadataConfig.getSecondaryIndexColumn();
+ if (StringUtils.isNullOrEmpty(secondaryIndexColumn)) {
+ return false;
+ }
+ // check the index definition already exists or not for this column
+ return dataMetaClient.getIndexMetadata().isEmpty() ||
!isIndexDefinitionPresentForColumn(secondaryIndexColumn,
PARTITION_NAME_SECONDARY_INDEX, dataMetaClient);
+ }
+
+ /**
+ * Given metadata config and table config, determine whether a new
expression index definition is required.
+ */
+ public static boolean
isNewExpressionIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String expressionIndexColumn = metadataConfig.getExpressionIndexColumn();
+ if (StringUtils.isNullOrEmpty(expressionIndexColumn)) {
+ return false;
+ }
+
+ // check that expr is present in index options
+ Map<String, String> expressionIndexOptions =
metadataConfig.getExpressionIndexOptions();
+ if (expressionIndexOptions.isEmpty()) {
+ return false;
+ }
+
+ String expression =
expressionIndexOptions.get(HoodieExpressionIndex.EXPRESSION_OPTION);
+ if (StringUtils.isNullOrEmpty(expression)) {
+ return false;
+ }
+
+ // check the index definition already exists or not for this expression
+ return dataMetaClient.getIndexMetadata().isEmpty() ||
!isIndexDefinitionPresentForColumn(expressionIndexColumn,
PARTITION_NAME_EXPRESSION_INDEX_PREFIX, dataMetaClient);
+ }
+
+ private static boolean isIndexDefinitionPresentForColumn(String
indexedColumn, String indexType, HoodieTableMetaClient dataMetaClient) {
+ return dataMetaClient.getIndexMetadata().isPresent() &&
dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream()
+ .anyMatch(indexDefinition ->
indexDefinition.getSourceFields().contains(indexedColumn) &&
indexDefinition.getIndexType().equals(indexType));
Review Comment:
For expression index we should check the expression as well.
There can be two expression indexes defined for the same column with
different expressions.
Also we should probably log the error here. User might not get to know why
the index is not getting created even with the configuration.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1524,4 +1526,59 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
// deleted record should still show in time travel view
assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
}
+
+ /**
+ * Test Secondary Index creation through datasource and metadata write
configs.
+ *
+ * 1. Insert a batch of data with record_index enabled.
+ * 2. Upsert another batch of data with secondary index configs.
+ * 3. Validate that secondary index is created.
+ */
+ @Test
+ def testSecondaryIndexCreation(): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts()
+ writeOpts = writeOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+ HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+ )
+ readOpts = readOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ )
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // Create a MOR table and add 10 records to the table.
+ val records = recordsToStrings(dataGen.generateInserts("000",
3)).asScala.toSeq
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 10))
+ inputDF.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotDF =
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+ assertEquals(10, snapshotDF.count())
+
+ // Upsert another batch with secondary index configs
+ val secondaryIndexName = "idx_name"
+ val secondaryIndexColumn = "name"
+ writeOpts = writeOpts ++ Map(
+ HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true",
+ HoodieMetadataConfig.SECONDARY_INDEX_NAME.key -> secondaryIndexName,
+ HoodieMetadataConfig.SECONDARY_INDEX_COLUMN.key -> secondaryIndexColumn
+ )
+ val records2 = recordsToStrings(dataGen.generateInserts("001",
3)).asScala.toSeq
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2,
10))
+ inputDF2.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // validate that secondary index is created
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ // validate the secondary index is built
+ assertTrue(metadataPartitionExists(basePath, context,
SECONDARY_INDEX.getPartitionPath(metaClient, secondaryIndexName)))
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(SECONDARY_INDEX.getPartitionPath(metaClient,
secondaryIndexName)))
Review Comment:
We can also add a case with two secondary indexes being created.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java:
##########
@@ -187,6 +187,7 @@ public int start(int retry) {
if (PARTITION_NAME_RECORD_INDEX.equals(p)) {
props.setProperty(RECORD_INDEX_ENABLE_PROP.key(), "true");
}
+ // TODO: check if we need to set other indexes as well
Review Comment:
Should we create a jira for it?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java:
##########
@@ -179,6 +181,70 @@ public void testIndexerForRecordIndex() {
"streamer-config/indexer-record-index.properties");
}
+ /**
+ * Test indexer for RLI and secondary index.
+ */
+ @Test
+ public void testIndexerForSecondaryIndex() {
+ String tableName = "indexer_test_rli_si";
+ // enable files and bloom_filters only with the regular write client
+ HoodieMetadataConfig.Builder metadataConfigBuilder =
HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withAsyncIndex(false).withMetadataIndexColumnStats(false);
+ upsertToTable(metadataConfigBuilder.build(), tableName);
+
+ // validate table config
+ metaClient = reload(metaClient);
+
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
+
+ // build RLI
+ indexMetadataPartitionsAndAssert(RECORD_INDEX,
Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[]
{COLUMN_STATS, BLOOM_FILTERS}), tableName,
+ "streamer-config/indexer-record-index.properties");
+ // rebuild metadata config with secondary index name and indexed column
+ String indexName = "idx_name";
+ /*metadataConfigBuilder = HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withAsyncIndex(false).withMetadataIndexColumnStats(false)
+ .withSecondaryIndexName(indexName)
+ .withSecondaryIndexForColumn("name");
+ upsertToTable(metadataConfigBuilder.build(), tableName);*/
Review Comment:
NIT: redundant code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]