nsivabalan commented on code in PR #12653:
URL: https://github.com/apache/hudi/pull/12653#discussion_r1928165401
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java:
##########
@@ -133,51 +130,43 @@ private void createRecordIndex(HoodieTableMetaClient
metaClient, String userInde
@Override
public void createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient
metaClient, List<String> columnsToIndex) {
- HoodieIndexDefinition indexDefinition = new
HoodieIndexDefinition(PARTITION_NAME_COLUMN_STATS, PARTITION_NAME_COLUMN_STATS,
PARTITION_NAME_COLUMN_STATS,
- columnsToIndex, Collections.EMPTY_MAP);
+ HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+ .withIndexName(PARTITION_NAME_COLUMN_STATS)
+ .withIndexType(PARTITION_NAME_COLUMN_STATS)
+ .withIndexFunction(PARTITION_NAME_COLUMN_STATS)
+ .withSourceFields(columnsToIndex)
+ .withIndexOptions(Collections.EMPTY_MAP)
+ .build();
LOG.info("Registering Or Updating the index " +
PARTITION_NAME_COLUMN_STATS);
register(metaClient, indexDefinition);
}
private void createExpressionOrSecondaryIndex(HoodieTableMetaClient
metaClient, String userIndexName, String indexType,
Map<String, Map<String,
String>> columns, Map<String, String> options, Map<String, String>
tableProperties) throws Exception {
- String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
- ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
- : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
- if (indexExists(metaClient, fullIndexName)) {
- throw new HoodieMetadataIndexException("Index already exists: " +
userIndexName);
- }
- checkArgument(columns.size() == 1, "Only one column can be indexed for
functional or secondary index.");
-
- if (!isEligibleForIndexing(metaClient, indexType, tableProperties,
columns)) {
- throw new HoodieMetadataIndexException("Not eligible for indexing: " +
indexType + ", indexName: " + userIndexName);
- }
-
- HoodieIndexDefinition indexDefinition = new
HoodieIndexDefinition(fullIndexName, indexType,
options.getOrDefault(EXPRESSION_OPTION, IDENTITY_FUNCTION),
- new ArrayList<>(columns.keySet()), options);
+ HoodieIndexDefinition indexDefinition =
HoodieIndexUtils.getSecondaryOrExpressionIndexDefinition(metaClient,
userIndexName, indexType, columns, options, tableProperties);
if
(!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()
|| !metaClient.getIndexMetadata().isPresent()
- ||
!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(fullIndexName))
{
+ ||
!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(indexDefinition.getIndexName()))
{
LOG.info("Index definition is not present. Registering the index first");
register(metaClient, indexDefinition);
}
ValidationUtils.checkState(metaClient.getIndexMetadata().isPresent(),
"Index definition is not present");
- LOG.info("Creating index {} of using {}", fullIndexName, indexType);
+ LOG.info("Creating index {} of using {}", indexDefinition.getIndexName(),
indexType);
Option<HoodieIndexDefinition> expressionIndexDefinitionOpt =
Option.ofNullable(indexDefinition);
try (SparkRDDWriteClient writeClient = getWriteClient(metaClient,
expressionIndexDefinitionOpt, Option.of(indexType))) {
MetadataPartitionType partitionType =
indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ?
MetadataPartitionType.SECONDARY_INDEX : MetadataPartitionType.EXPRESSION_INDEX;
// generate index plan
- Option<String> indexInstantTime = doSchedule(writeClient, metaClient,
fullIndexName, partitionType);
+ Option<String> indexInstantTime = doSchedule(writeClient, metaClient,
indexDefinition.getIndexName(), partitionType);
if (indexInstantTime.isPresent()) {
// build index
writeClient.index(indexInstantTime.get());
} else {
throw new HoodieMetadataIndexException("Scheduling of index action did
not return any instant.");
}
} catch (Throwable t) {
- drop(metaClient, fullIndexName, Option.ofNullable(indexDefinition));
+ drop(metaClient, indexDefinition.getIndexName(),
Option.ofNullable(indexDefinition));
Review Comment:
can we add a warn log here that indexing failed and so we are dropping the
index.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -211,38 +210,28 @@ public String getIndexDefinitionPath() {
}
/**
- * Builds expression index definition and writes to index definition file.
- * Support mutable and immutable index definition. Only col stats is
mutable, while all others are immutable.
- * Inacse of immutable index definition, we could only create or delete the
definition.
- * Incase of mutable (col stats), list of source columns (or list of columns
to index) could also change.
+ * Builds index definition and writes to index definition file. Support
mutable and immutable index definition.
+ * For instance, if index definition is mutable (like column stats), list of
source columns (or list of columns to index) could also change.
+ * If an index definition is present for the index name, it will be updated
only when there is difference between present and new index definition.
+ *
* @return true if index definition is updated.
*/
public boolean buildIndexDefinition(HoodieIndexDefinition indexDefinition) {
String indexName = indexDefinition.getIndexName();
- boolean isIndexDefnImmutable =
!indexDefinition.getIndexName().equals(PARTITION_NAME_COLUMN_STATS); // only
col stats is mutable.
- if (isIndexDefnImmutable) {
- checkState(
- !indexMetadataOpt.isPresent() ||
(!indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName)),
- "Index metadata is already present");
- }
String indexMetaPath = getIndexDefinitionPath();
boolean updateIndexDefn = true;
if (indexMetadataOpt.isPresent()) {
- if (isIndexDefnImmutable) {
- indexMetadataOpt.get().getIndexDefinitions().put(indexName,
indexDefinition);
- } else {
- // if index defn is mutable, lets check for difference and only update
if required.
- if
(indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName)) {
- if
(!indexMetadataOpt.get().getIndexDefinitions().get(indexName).getSourceFields().equals(indexDefinition.getSourceFields()))
{
- LOG.info(String.format("List of columns to index is changing. Old
value %s. New value %s",
-
indexMetadataOpt.get().getIndexDefinitions().get(indexName).getSourceFields(),
indexDefinition.getSourceFields()));
- indexMetadataOpt.get().getIndexDefinitions().put(indexName,
indexDefinition);
- } else {
- updateIndexDefn = false;
- }
- } else {
+ // if index definition is present, lets check for difference and only
update if required.
+ if (indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName))
{
+ if
(!indexMetadataOpt.get().getIndexDefinitions().get(indexName).getSourceFields().equals(indexDefinition.getSourceFields()))
{
Review Comment:
for sec and expr index, just comparing source fields is good enough is it?
don't we need to compare other attributes like index type, index options etc?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -460,4 +473,69 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
throw new HoodieIndexException("Unsupported record type: " +
recordType);
}
}
+
+ /**
+ * Register a metadata index.
+ * Index definitions are stored in user-specified path or, by default, in
.hoodie/.index_defs/index.json.
+ * For the first time, the index definition file will be created if not
exists.
+ * For the second time, the index definition file will be updated if exists.
+ * Table Config is updated if necessary.
+ */
+ public static void register(HoodieTableMetaClient metaClient,
HoodieIndexDefinition indexDefinition) {
+ LOG.info("Registering index {} of using {}",
indexDefinition.getIndexName(), indexDefinition.getIndexType());
+ // build HoodieIndexMetadata and then add to index definition file
+ boolean indexDefnUpdated =
metaClient.buildIndexDefinition(indexDefinition);
+ if (indexDefnUpdated) {
+ String indexMetaPath = metaClient.getIndexDefinitionPath();
+ // update table config if necessary
+ if
(!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
+ ||
!metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
+
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(indexMetaPath)));
+ HoodieTableConfig.update(metaClient.getStorage(),
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
+ }
+ }
+ }
+
+ public static HoodieIndexDefinition
getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient,
String userIndexName, String indexType, Map<String, Map<String, String>>
columns,
+
Map<String, String> options, Map<String, String> tableProperties) throws
Exception {
+ String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
+ ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
+ : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+ if (indexExists(metaClient, fullIndexName)) {
+ throw new HoodieMetadataIndexException("Index already exists: " +
userIndexName);
+ }
+ checkArgument(columns.size() == 1, "Only one column can be indexed for
functional or secondary index.");
+
+ if (!isEligibleForSecondaryOrExpressionIndex(metaClient, indexType,
tableProperties, columns)) {
+ throw new HoodieMetadataIndexException("Not eligible for indexing: " +
indexType + ", indexName: " + userIndexName);
+ }
+
+ return HoodieIndexDefinition.newBuilder()
+ .withIndexName(fullIndexName)
+ .withIndexType(indexType)
+ .withIndexFunction(options.getOrDefault(EXPRESSION_OPTION,
IDENTITY_TRANSFORM))
+ .withSourceFields(new ArrayList<>(columns.keySet()))
+ .withIndexOptions(options)
+ .build();
+ }
+
+ public static boolean indexExists(HoodieTableMetaClient metaClient, String
indexName) {
Review Comment:
why public?
if we need it for testing, can you add the VisibleForAnnotation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]