nsivabalan commented on code in PR #12653:
URL: https://github.com/apache/hudi/pull/12653#discussion_r1924058446
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,66 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
throw new HoodieIndexException("Unsupported record type: " +
recordType);
}
}
+
+ /**
+ * Register a metadata index.
+ * Index definitions are stored in user-specified path or, by default, in
.hoodie/.index_defs/index.json.
+ * For the first time, the index definition file will be created if not
exists.
+ * For the second time, the index definition file will be updated if exists.
+ * Table Config is updated if necessary.
+ */
+ public static void register(HoodieTableMetaClient metaClient,
HoodieIndexDefinition indexDefinition) {
+ LOG.info("Registering index {} of using {}",
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+ // build HoodieIndexMetadata and then add to index definition file
+ boolean indexDefnUpdated =
metaClient.buildIndexDefinition(indexDefinition);
+ if (indexDefnUpdated) {
+ String indexMetaPath = metaClient.getIndexDefinitionPath();
+ // update table config if necessary
+ if
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+ ||
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
+
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(indexMetaPath)));
+ HoodieTableConfig.update(metaClient.getStorage(),
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
+ }
+ }
+ }
+
+ public static HoodieIndexDefinition
getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient,
String userIndexName, String indexType, Map<String, Map<String, String>>
columns,
+
Map<String, String> options, Map<String, String> tableProperties) throws
Exception {
+ String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
+ ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
+ : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+ if (indexExists(metaClient, fullIndexName)) {
+ throw new HoodieMetadataIndexException("Index already exists: " +
userIndexName);
+ }
+ checkArgument(columns.size() == 1, "Only one column can be indexed for
functional or secondary index.");
+
+ if (!isEligibleForIndexing(metaClient, indexType, tableProperties,
columns)) {
+ throw new HoodieMetadataIndexException("Not eligible for indexing: " +
indexType + ", indexName: " + userIndexName);
+ }
+
+ return HoodieIndexDefinition.newBuilder()
+ .withIndexName(fullIndexName)
+ .withIndexType(indexType)
+ .withIndexFunction(options.getOrDefault(EXPRESSION_OPTION,
IDENTITY_TRANSFORM))
+ .withSourceFields(new ArrayList<>(columns.keySet()))
+ .withIndexOptions(options)
+ .build();
+ }
+
+ public static boolean indexExists(HoodieTableMetaClient metaClient, String
indexName) {
+ return
metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition
-> partition.equals(indexName));
+ }
+
+ private static boolean isEligibleForIndexing(HoodieTableMetaClient
metaClient, String indexType, Map<String, String> options, Map<String,
Map<String, String>> columns) throws Exception {
Review Comment:
is this applicable only for sec index or other indexes too?
if yes, condition in 527 is not very apparent. I would expect something like
```
if (curIndex is sec Index && !validateDataTypeForSecondaryIndex(new
ArrayList<>(columns.keySet()), new
TableSchemaResolver(metaClient).getTableAvroSchema())
{
}
```
of I would rename the method naming to "isEligibleForSecIndexing"
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -352,6 +358,33 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating expression
index.");
+ public static final ConfigProperty<String> EXPRESSION_INDEX_COLUMN =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.expression.column")
Review Comment:
are these mainly introduced to support index creation via spark datasource
writer?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2766,6 +2768,79 @@ private static void validatePayload(int type,
Map<String, HoodieMetadataFileInfo
}
}
+ public static Set<String>
getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType,
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+ Set<String> expressionIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType, dataMetaClient);
+ if (expressionIndexPartitionsToInit.isEmpty()) {
+ if (isNewExpressionIndexDefinitionRequired(metadataConfig,
dataMetaClient)) {
+ String indexedColumn = metadataConfig.getExpressionIndexColumn();
+ String indexName = metadataConfig.getExpressionIndexName();
+ String indexType = metadataConfig.getExpressionIndexType();
+ // Use a default index name if the indexed column is specified but
index name is not
+ if (StringUtils.isNullOrEmpty(indexName) &&
StringUtils.nonEmpty(indexedColumn)) {
+ indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexedColumn;
+ }
+ // if user defined index name does not contain the expression_index_
prefix, then add it
+ if (StringUtils.nonEmpty(indexName) &&
!indexName.startsWith(PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) {
+ indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexName;
+ }
+ // Build and register the new index definition
+ HoodieIndexDefinition indexDefinition =
HoodieIndexDefinition.newBuilder()
+ .withIndexName(indexName)
+ .withIndexType(indexType)
+ .withSourceFields(Collections.singletonList(indexedColumn))
+ .withIndexOptions(metadataConfig.getExpressionIndexOptions())
+ .build();
+ dataMetaClient.buildIndexDefinition(indexDefinition);
+ // Re-fetch the partitions after adding the new definition
+ expressionIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType, dataMetaClient);
+ } else {
+ return null;
+ }
+ }
+ return expressionIndexPartitionsToInit;
+ }
+
+ public static Set<String>
getSecondaryIndexPartitionsToInit(MetadataPartitionType partitionType,
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+ Set<String> secondaryIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType, dataMetaClient);
Review Comment:
looks like there are some code duplication. can we templatize and use it for
both expr index and sec index
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2766,6 +2768,79 @@ private static void validatePayload(int type,
Map<String, HoodieMetadataFileInfo
}
}
+ public static Set<String>
getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType,
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+ Set<String> expressionIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType, dataMetaClient);
+ if (expressionIndexPartitionsToInit.isEmpty()) {
+ if (isNewExpressionIndexDefinitionRequired(metadataConfig,
dataMetaClient)) {
+ String indexedColumn = metadataConfig.getExpressionIndexColumn();
+ String indexName = metadataConfig.getExpressionIndexName();
+ String indexType = metadataConfig.getExpressionIndexType();
+ // Use a default index name if the indexed column is specified but
index name is not
+ if (StringUtils.isNullOrEmpty(indexName) &&
StringUtils.nonEmpty(indexedColumn)) {
+ indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexedColumn;
+ }
+ // if user defined index name does not contain the expression_index_
prefix, then add it
+ if (StringUtils.nonEmpty(indexName) &&
!indexName.startsWith(PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) {
+ indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexName;
+ }
+ // Build and register the new index definition
+ HoodieIndexDefinition indexDefinition =
HoodieIndexDefinition.newBuilder()
+ .withIndexName(indexName)
+ .withIndexType(indexType)
+ .withSourceFields(Collections.singletonList(indexedColumn))
+ .withIndexOptions(metadataConfig.getExpressionIndexOptions())
+ .build();
+ dataMetaClient.buildIndexDefinition(indexDefinition);
+ // Re-fetch the partitions after adding the new definition
+ expressionIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType, dataMetaClient);
+ } else {
+ return null;
Review Comment:
can we return empty list instead
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,61 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
throw new HoodieIndexException("Unsupported record type: " +
recordType);
}
}
+
+ /**
+ * Register a metadata index.
+ * Index definitions are stored in user-specified path or, by default, in
.hoodie/.index_defs/index.json.
+ * For the first time, the index definition file will be created if not
exists.
+ * For the second time, the index definition file will be updated if exists.
+ * Table Config is updated if necessary.
+ */
+ public static void register(HoodieTableMetaClient metaClient,
HoodieIndexDefinition indexDefinition) {
+ LOG.info("Registering index {} of using {}",
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+ // build HoodieIndexMetadata and then add to index definition file
+ boolean indexDefnUpdated =
metaClient.buildIndexDefinition(indexDefinition);
+ if (indexDefnUpdated) {
+ String indexMetaPath = metaClient.getIndexDefinitionPath();
+ // update table config if necessary
+ if
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+ ||
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
Review Comment:
can you add "Notes to Reviewer" for such code snippets where we just moved
and not a new code change. might save some time in reviews.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2766,6 +2768,79 @@ private static void validatePayload(int type,
Map<String, HoodieMetadataFileInfo
}
}
+ public static Set<String>
getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType,
HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
+ Set<String> expressionIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType, dataMetaClient);
+ if (expressionIndexPartitionsToInit.isEmpty()) {
+ if (isNewExpressionIndexDefinitionRequired(metadataConfig,
dataMetaClient)) {
Review Comment:
this is little unintuitive.
I would expect L 2772 to return the new index to initialize.
but if it turns out to be empty, here, we end up checking for new index defn
again. so, what does getIndexPartitionsToInit(partitionType, dataMetaClient)
lacks?
may be we can rename the methods to avoid dis-ambiguity
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1524,4 +1524,79 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
// deleted record should still show in time travel view
assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
}
+
+ /**
+ * Test Secondary Index creation through datasource and metadata write
configs.
+ *
+ * 1. Insert a batch of data with record_index enabled.
+ * 2. Upsert another batch of data with secondary index configs.
+ * 3. Validate that secondary index is created.
+ */
+ @Test
+ def testSecondaryIndexCreation(): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts()
+ writeOpts = writeOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+ HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+ )
+ readOpts = readOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ )
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // Create a MOR table and add 10 records to the table.
+ val records = recordsToStrings(dataGen.generateInserts("000",
3)).asScala.toSeq
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotDF =
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+ assertEquals(3, snapshotDF.count())
+
+ // Upsert another batch with secondary index configs
+ val secondaryIndexName = "idx_rider"
+ val secondaryIndexColumn = "rider"
+ writeOpts = writeOpts ++ Map(
+ HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true",
+ HoodieMetadataConfig.SECONDARY_INDEX_NAME.key -> secondaryIndexName,
+ HoodieMetadataConfig.SECONDARY_INDEX_COLUMN.key -> secondaryIndexColumn
+ )
+ val records2 = recordsToStrings(dataGen.generateInserts("001",
3)).asScala.toSeq
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2,
10))
+ inputDF2.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // validate that secondary index is created
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertTrue(metadataPartitionExists(basePath, context,
PARTITION_NAME_SECONDARY_INDEX_PREFIX + secondaryIndexName))
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_NAME_SECONDARY_INDEX_PREFIX
+ secondaryIndexName))
Review Comment:
I am just trying to improve the usability for users. Index creation is just
one off thing which a central team/admin can do. And then individual teams can
own the regular writes (from an organization standpoint)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,66 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
throw new HoodieIndexException("Unsupported record type: " +
recordType);
}
}
+
+ /**
+ * Register a metadata index.
+ * Index definitions are stored in user-specified path or, by default, in
.hoodie/.index_defs/index.json.
+ * For the first time, the index definition file will be created if not
exists.
+ * For the second time, the index definition file will be updated if exists.
+ * Table Config is updated if necessary.
+ */
+ public static void register(HoodieTableMetaClient metaClient,
HoodieIndexDefinition indexDefinition) {
+ LOG.info("Registering index {} of using {}",
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+ // build HoodieIndexMetadata and then add to index definition file
+ boolean indexDefnUpdated =
metaClient.buildIndexDefinition(indexDefinition);
+ if (indexDefnUpdated) {
+ String indexMetaPath = metaClient.getIndexDefinitionPath();
+ // update table config if necessary
+ if
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+ ||
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
+
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(indexMetaPath)));
+ HoodieTableConfig.update(metaClient.getStorage(),
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
+ }
+ }
+ }
+
+ public static HoodieIndexDefinition
getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient,
String userIndexName, String indexType, Map<String, Map<String, String>>
columns,
+
Map<String, String> options, Map<String, String> tableProperties) throws
Exception {
+ String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
+ ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
+ : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+ if (indexExists(metaClient, fullIndexName)) {
+ throw new HoodieMetadataIndexException("Index already exists: " +
userIndexName);
+ }
+ checkArgument(columns.size() == 1, "Only one column can be indexed for
functional or secondary index.");
+
+ if (!isEligibleForIndexing(metaClient, indexType, tableProperties,
columns)) {
+ throw new HoodieMetadataIndexException("Not eligible for indexing: " +
indexType + ", indexName: " + userIndexName);
+ }
+
+ return HoodieIndexDefinition.newBuilder()
+ .withIndexName(fullIndexName)
+ .withIndexType(indexType)
+ .withIndexFunction(options.getOrDefault(EXPRESSION_OPTION,
IDENTITY_TRANSFORM))
+ .withSourceFields(new ArrayList<>(columns.keySet()))
+ .withIndexOptions(options)
+ .build();
+ }
+
+ public static boolean indexExists(HoodieTableMetaClient metaClient, String
indexName) {
+ return
metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition
-> partition.equals(indexName));
+ }
+
+ private static boolean isEligibleForIndexing(HoodieTableMetaClient
metaClient, String indexType, Map<String, String> options, Map<String,
Map<String, String>> columns) throws Exception {
+ if (!validateDataTypeForSecondaryIndex(new ArrayList<>(columns.keySet()),
new TableSchemaResolver(metaClient).getTableAvroSchema())) {
+ return false;
+ }
+ // for secondary index, record index is a must
+ if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
+ // either record index is enabled or record index partition is already
present
+ return
metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition
-> partition.equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()))
Review Comment:
why can't we move data type validation in L527 to here (within this if block)
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -220,11 +220,6 @@ public String getIndexDefinitionPath() {
public boolean buildIndexDefinition(HoodieIndexDefinition indexDefinition) {
String indexName = indexDefinition.getIndexName();
boolean isIndexDefnImmutable =
!indexDefinition.getIndexName().equals(PARTITION_NAME_COLUMN_STATS); // only
col stats is mutable.
- if (isIndexDefnImmutable) {
Review Comment:
looks like we don't even need L 222. can we move it then
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1050,6 +1037,7 @@ private Set<String> getMetadataPartitionsToUpdate() {
}
public void buildMetadataPartitions(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos, String instantTime) throws
IOException {
+ // TODO: so we need to create an index definition in case we're coming via
the indexer
Review Comment:
can you link follow up jira id
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1524,4 +1524,79 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
// deleted record should still show in time travel view
assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
}
+
+ /**
+ * Test Secondary Index creation through datasource and metadata write
configs.
+ *
+ * 1. Insert a batch of data with record_index enabled.
+ * 2. Upsert another batch of data with secondary index configs.
+ * 3. Validate that secondary index is created.
+ */
+ @Test
+ def testSecondaryIndexCreation(): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts()
+ writeOpts = writeOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+ HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+ )
+ readOpts = readOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ )
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // Create a MOR table and add 10 records to the table.
+ val records = recordsToStrings(dataGen.generateInserts("000",
3)).asScala.toSeq
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotDF =
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+ assertEquals(3, snapshotDF.count())
+
+ // Upsert another batch with secondary index configs
+ val secondaryIndexName = "idx_rider"
+ val secondaryIndexColumn = "rider"
+ writeOpts = writeOpts ++ Map(
+ HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true",
+ HoodieMetadataConfig.SECONDARY_INDEX_NAME.key -> secondaryIndexName,
+ HoodieMetadataConfig.SECONDARY_INDEX_COLUMN.key -> secondaryIndexColumn
+ )
+ val records2 = recordsToStrings(dataGen.generateInserts("001",
3)).asScala.toSeq
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2,
10))
+ inputDF2.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // validate that secondary index is created
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertTrue(metadataPartitionExists(basePath, context,
PARTITION_NAME_SECONDARY_INDEX_PREFIX + secondaryIndexName))
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_NAME_SECONDARY_INDEX_PREFIX
+ secondaryIndexName))
Review Comment:
whats the expectation for subsequent writes to the hudi table?
do we expect users to set all sec index related configs for all subsequent
writes too?
since in your other patch, we are introducing explicit config for mdt
partitions to delete, may be,user can ignore these sec index related configs in
subsequent writes and hudi will still continue to write/populate SI partition.
whats your take?
if yes, can we enhance the test and validate that?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java:
##########
@@ -133,6 +133,7 @@ public Option<HoodieIndexPlan> execute() {
private HoodieIndexPartitionInfo
buildIndexPartitionInfo(MetadataPartitionType partitionType, HoodieInstant
indexUptoInstant) {
// for expression index, we need to pass the index name as the partition
name
+ // TODO: see the index partition info is built correctly. Should we
register the index here?
Review Comment:
can we remove the comment then?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -471,6 +471,59 @@ public static MetadataPartitionType
fromPartitionPath(String partitionPath) {
throw new IllegalArgumentException("No MetadataPartitionType for partition
path: " + partitionPath);
}
+ /**
+ * Given metadata config and table config, determine whether a new secondary
index definition is required.
+ */
+ public static boolean
isNewSecondaryIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String secondaryIndexColumn = metadataConfig.getSecondaryIndexColumn();
+ if (StringUtils.isNullOrEmpty(secondaryIndexColumn)) {
+ return false;
+ }
+ // check the index definition already exists or not for this column
+ List<HoodieIndexDefinition> indexDefinitions =
getIndexDefinitions(secondaryIndexColumn, PARTITION_NAME_SECONDARY_INDEX,
dataMetaClient);
+ return indexDefinitions.isEmpty();
+ }
+
+ /**
+ * Given metadata config and table config, determine whether a new
expression index definition is required.
+ */
+ public static boolean
isNewExpressionIndexDefinitionRequired(HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataMetaClient) {
+ String expressionIndexColumn = metadataConfig.getExpressionIndexColumn();
+ if (StringUtils.isNullOrEmpty(expressionIndexColumn)) {
+ return false;
+ }
+
+ // check that expr is present in index options
+ Map<String, String> expressionIndexOptions =
metadataConfig.getExpressionIndexOptions();
+ if (expressionIndexOptions.isEmpty()) {
+ return false;
+ }
+
+ // get all index definitions for this column and index type
+ // check if none of the index definitions has index function matching the
expression
+ List<HoodieIndexDefinition> indexDefinitions =
getIndexDefinitions(expressionIndexColumn, PARTITION_NAME_EXPRESSION_INDEX,
dataMetaClient);
+ return indexDefinitions.isEmpty()
+ || indexDefinitions.stream().noneMatch(indexDefinition ->
indexDefinition.getIndexFunction().equals(expressionIndexOptions.get(HoodieExpressionIndex.EXPRESSION_OPTION)));
Review Comment:
can't we override equality func for HoodieIndexDefinition. would simplify
these checks right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]