codope commented on code in PR #12529:
URL: https://github.com/apache/hudi/pull/12529#discussion_r1899960031
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false,
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
+ // update cols to Index as applicable
+ HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+ (Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(val1, val2) -> {
+ updateColumnsToIndexWithColStats(val1, val2);
+ return null;
Review Comment:
Is there a better way than using `Function` and avoid returning null?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {
Review Comment:
Let's rename the class to `HoodieColumnStatsIndexUtils`, and wherever using
`*ColStats*` in class or method names can be renamed to `*ColumnStats*`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result,
List<HoodieWriteStat> write
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
+ // update cols to Index as applicable
+ HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+ (Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(val1, val2) -> {
Review Comment:
```suggestion
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(metaClient, columnsToIndex) -> {
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false,
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
+ // update cols to Index as applicable
+ HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
Review Comment:
Can you please remind me why we are updating the columns after writing data
commit file? What if we moved this in `writeTableMetadata`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {
+
+ /**
+ * Updates the list of columns to index with col stats partition in MDT.
+ * @param dataTable {@link HoodieTable} of interest.
+ * @param config {@link HoodieWriteConfig} of interest.
+ * @param commitMetadata commit metadata of interest.
+ * @param updateColSatsFunc function to assist with updating columns to
index.
+ */
+ @VisibleForTesting
+ public static void updateColsToIndex(HoodieTable dataTable,
HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
+ Functions.Function2<HoodieTableMetaClient,
List<String>, Void> updateColSatsFunc) {
+ if (config.getMetadataConfig().isColumnStatsIndexEnabled()) {
+ dataTable.getMetaClient().reloadTableConfig();
+ if
(dataTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
{
+ try {
+ HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+ .setStorage(dataTable.getStorage())
+
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath()))
+ .build();
+ HoodieInstant latestInstant =
mdtMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant().get();
+ final HoodieCommitMetadata mdtCommitMetadata =
mdtMetaClient.getTimelineLayout().getCommitMetadataSerDe().deserialize(
+ latestInstant,
+
mdtMetaClient.getActiveTimeline().getInstantDetails(latestInstant).get(),
+ HoodieCommitMetadata.class);
+ if
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
{
Review Comment:
if we move the call inside `writeTableMetadata`, do we still need to do all
of this? We have metadata writer in that method right.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieSparkIndex.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.HoodieSparkIndexClient;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.hudi.index.expression.ExpressionIndexSparkFunctions.IDENTITY_FUNCTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.RANGE_TYPE;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieSparkIndex extends HoodieClientTestBase {
+
+ @Test
+ public void testIndexCreateAndDrop() throws IOException {
+ HoodieWriteConfig cfg = getConfigBuilder().build();
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+
+ initMetaClient(HoodieTableType.COPY_ON_WRITE);
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String commitTime1 = "001";
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
+ List<WriteStatus> statuses1 = client.upsert(writeRecords1,
commitTime1).collect();
+ assertNoWriteErrors(statuses1);
+ }
+
+ HoodieSparkIndexClient sparkIndexClient = new HoodieSparkIndexClient(cfg,
context);
+
+ // add 4 indices(2 SI and 2 EI)
+ String indexNamePrefix = "index";
+ String fieldNamePrefix = "field";
+ for (int i = 0; i < 5; i++) {
+ String indexName = indexNamePrefix + "_" + i;
+ String fieldName = fieldNamePrefix + "_" + i;
+ HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexName,
+ i % 2 == 0 ? PARTITION_NAME_SECONDARY_INDEX :
PARTITION_NAME_EXPRESSION_INDEX, fieldName);
+ sparkIndexClient.register(metaClient, indexDefinition);
+ readAndValidateIndexDefn(indexDefinition);
+ }
+
+ // add col stats index.
+ HoodieIndexDefinition colStatsIndexDefinition =
getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+ RANGE_TYPE, fieldNamePrefix + "_5");
+ sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+ readAndValidateIndexDefn(colStatsIndexDefinition);
+
+ // drop one among sec index and expression index.
+ metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_1",
PARTITION_NAME_EXPRESSION_INDEX));
+ metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_2",
PARTITION_NAME_SECONDARY_INDEX));
+
+ //validate rest.
+ HoodieTableMetaClient newMetaClient =
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+ .build();
+ HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexNamePrefix
+ "_0", PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_0");
+ readAndValidateIndexDefn(indexDefinition, newMetaClient);
+ indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_3",
PARTITION_NAME_EXPRESSION_INDEX, fieldNamePrefix + "_3");
+ readAndValidateIndexDefn(indexDefinition, newMetaClient);
+ indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_4",
PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_4");
+ readAndValidateIndexDefn(indexDefinition, newMetaClient);
+ readAndValidateIndexDefn(colStatsIndexDefinition);
+
+ // update col stats w/ new set of cols
+ List<String> colsToIndex = IntStream.range(0, 10).mapToObj(number ->
fieldNamePrefix + "_" + number).collect(Collectors.toList());
+ colStatsIndexDefinition = getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+ RANGE_TYPE, RANGE_TYPE, colsToIndex, Collections.EMPTY_MAP);
+ sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+ readAndValidateIndexDefn(colStatsIndexDefinition);
+
+ // drop col stats
+ metaClient.deleteIndexDefinition(colStatsIndexDefinition.getIndexName());
+ readAndValidateIndexDefnNotPresent(colStatsIndexDefinition,
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+ .build());
+ }
+
+ private void readAndValidateIndexDefn(HoodieIndexDefinition
expectedIndexDefn) {
+ HoodieTableMetaClient newMetaClient =
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+ .build();
+ readAndValidateIndexDefn(expectedIndexDefn, newMetaClient);
+ }
+
+ private void readAndValidateIndexDefn(HoodieIndexDefinition
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+ assertTrue(metaClient.getIndexMetadata().isPresent());
+
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+
assertTrue(metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+ assertEquals(expectedIndexDefn,
metaClient.getIndexMetadata().get().getIndexDefinitions().get(expectedIndexDefn.getIndexName()));
+ }
+
+ private void readAndValidateIndexDefnNotPresent(HoodieIndexDefinition
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+ assertTrue(metaClient.getIndexMetadata().isPresent());
+
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+ }
+
+ private HoodieIndexDefinition getHoodieIndexDefn(String indexName, String
indexType, String sourceField) {
Review Comment:
```suggestion
private HoodieIndexDefinition getIndexDefinition(String indexName, String
indexType, String sourceField) {
```
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1683,11 +1688,11 @@ private static Comparable<?> coerceToComparable(Schema
schema, Object val) {
}
private static boolean isColumnTypeSupported(Schema schema,
Option<HoodieRecordType> recordType) {
- // if record type is set and if its AVRO, MAP is unsupported.
- if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
+ // if record type is not set or if its AVRO, MAP is unsupported.
+ if (!recordType.isPresent() || recordType.get() == HoodieRecordType.AVRO) {
Review Comment:
let's add a test for:
a) nested field
b) field configured to index in metadata config but its data type is not
supported. In this case, I expect to either error out and have no update in
index definition, or only update supported fields in the ndex definition.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false,
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
+ // update cols to Index as applicable
+ HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+ (Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(val1, val2) -> {
Review Comment:
```suggestion
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(metaClient, columnsToIndex) -> {
```
##########
hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java:
##########
@@ -45,6 +45,8 @@ public interface HoodieExpressionIndex<S, T> extends
Serializable {
String DAYS_OPTION = "days";
String FORMAT_OPTION = "format";
+ String RANGE_TYPE = "range";
Review Comment:
Can't we use `column_stats` as index type? It is consistent with EI.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result,
List<HoodieWriteStat> write
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
+ // update cols to Index as applicable
+ HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+ (Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(val1, val2) -> {
+ updateColumnsToIndexWithColStats(val1, val2);
+ return null;
+ });
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime,
e);
}
}
+ /**
+ * Updates the list of columns indexed with col stats index in Metadata
table.
+ * @param metaClient instance of {@link HoodieTableMetaClient} of interest.
+ * @param columnsToIndex list of columns to index.
+ */
+ protected abstract void
updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String>
columnsToIndex);
Review Comment:
```suggestion
protected abstract void
updateColumnsToIndexForColumnStats(HoodieTableMetaClient metaClient,
List<String> columnsToIndex);
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result,
List<HoodieWriteStat> write
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
+ // update cols to Index as applicable
+ HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+ (Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(val1, val2) -> {
+ updateColumnsToIndexWithColStats(val1, val2);
+ return null;
+ });
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime,
e);
}
}
+ /**
+ * Updates the list of columns indexed with col stats index in Metadata
table.
+ * @param metaClient instance of {@link HoodieTableMetaClient} of interest.
+ * @param columnsToIndex list of columns to index.
+ */
+ protected abstract void
updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String>
columnsToIndex);
Review Comment:
The concrete impl of this API and that of
`BaseHoodieClient.updateColumnsToIndexWithColStats` is same (at least for
Spark). Do we still need this API? By the way, isn't colstats supported for
Flink?
Regardless, I think engine specific methods for updating index definition
does not fit well. Do we have such examples for updating table config?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1364,24 +1342,47 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
}
}
+ public static List<String> getColumnsToIndex(HoodieCommitMetadata
commitMetadata,
+ HoodieTableMetaClient dataMetaClient,
+ HoodieMetadataConfig metadataConfig) {
Review Comment:
nit: indentation
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result,
List<HoodieWriteStat> write
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
+ // update cols to Index as applicable
+ HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+ (Functions.Function2<HoodieTableMetaClient, List<String>, Void>)
(val1, val2) -> {
+ updateColumnsToIndexWithColStats(val1, val2);
+ return null;
Review Comment:
same here - if we can avoid returning null, it would be nice.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {
+
+ /**
+ * Updates the list of columns to index with col stats partition in MDT.
+ * @param dataTable {@link HoodieTable} of interest.
+ * @param config {@link HoodieWriteConfig} of interest.
+ * @param commitMetadata commit metadata of interest.
+ * @param updateColSatsFunc function to assist with updating columns to
index.
+ */
+ @VisibleForTesting
+ public static void updateColsToIndex(HoodieTable dataTable,
HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
+ Functions.Function2<HoodieTableMetaClient,
List<String>, Void> updateColSatsFunc) {
+ if (config.getMetadataConfig().isColumnStatsIndexEnabled()) {
+ dataTable.getMetaClient().reloadTableConfig();
+ if
(dataTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
{
+ try {
+ HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+ .setStorage(dataTable.getStorage())
+
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath()))
+ .build();
+ HoodieInstant latestInstant =
mdtMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant().get();
+ final HoodieCommitMetadata mdtCommitMetadata =
mdtMetaClient.getTimelineLayout().getCommitMetadataSerDe().deserialize(
+ latestInstant,
+
mdtMetaClient.getActiveTimeline().getInstantDetails(latestInstant).get(),
+ HoodieCommitMetadata.class);
+ if
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
{
+ // update data table's table config for list of columns indexed.
+ List<String> columnsToIndex =
HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata,
dataTable.getMetaClient(), config.getMetadataConfig());
+ // if col stats is getting updated, lets also update list of
columns indexed if changed.
+ updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Updating data table config to latest set
of columns indexed with col stats failed ", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes col stats index definition for the given table of interest.
+ * @param dataTableMetaClient {@link HoodieTableMetaClient} instance for the
data table.
+ */
+ @VisibleForTesting
+ public static void deleteColStatsIndexDefn(HoodieTableMetaClient
dataTableMetaClient) {
Review Comment:
```suggestion
public static void deleteColumnStatsIndexDefinition(HoodieTableMetaClient
dataTableMetaClient) {
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {
Review Comment:
also, wdyt about moving it to hudi-common? it's like table config update.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieSparkIndex.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.HoodieSparkIndexClient;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.hudi.index.expression.ExpressionIndexSparkFunctions.IDENTITY_FUNCTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.RANGE_TYPE;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieSparkIndex extends HoodieClientTestBase {
+
+ @Test
+ public void testIndexCreateAndDrop() throws IOException {
+ HoodieWriteConfig cfg = getConfigBuilder().build();
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+
+ initMetaClient(HoodieTableType.COPY_ON_WRITE);
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String commitTime1 = "001";
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
+ List<WriteStatus> statuses1 = client.upsert(writeRecords1,
commitTime1).collect();
+ assertNoWriteErrors(statuses1);
+ }
+
+ HoodieSparkIndexClient sparkIndexClient = new HoodieSparkIndexClient(cfg,
context);
+
+ // add 4 indices(2 SI and 2 EI)
+ String indexNamePrefix = "index";
+ String fieldNamePrefix = "field";
+ for (int i = 0; i < 5; i++) {
+ String indexName = indexNamePrefix + "_" + i;
+ String fieldName = fieldNamePrefix + "_" + i;
+ HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexName,
+ i % 2 == 0 ? PARTITION_NAME_SECONDARY_INDEX :
PARTITION_NAME_EXPRESSION_INDEX, fieldName);
+ sparkIndexClient.register(metaClient, indexDefinition);
+ readAndValidateIndexDefn(indexDefinition);
+ }
+
+ // add col stats index.
+ HoodieIndexDefinition colStatsIndexDefinition =
getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+ RANGE_TYPE, fieldNamePrefix + "_5");
+ sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+ readAndValidateIndexDefn(colStatsIndexDefinition);
+
+ // drop one among sec index and expression index.
+ metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_1",
PARTITION_NAME_EXPRESSION_INDEX));
+ metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_2",
PARTITION_NAME_SECONDARY_INDEX));
+
+ //validate rest.
+ HoodieTableMetaClient newMetaClient =
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+ .build();
+ HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexNamePrefix
+ "_0", PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_0");
+ readAndValidateIndexDefn(indexDefinition, newMetaClient);
+ indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_3",
PARTITION_NAME_EXPRESSION_INDEX, fieldNamePrefix + "_3");
+ readAndValidateIndexDefn(indexDefinition, newMetaClient);
+ indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_4",
PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_4");
+ readAndValidateIndexDefn(indexDefinition, newMetaClient);
+ readAndValidateIndexDefn(colStatsIndexDefinition);
+
+ // update col stats w/ new set of cols
+ List<String> colsToIndex = IntStream.range(0, 10).mapToObj(number ->
fieldNamePrefix + "_" + number).collect(Collectors.toList());
+ colStatsIndexDefinition = getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+ RANGE_TYPE, RANGE_TYPE, colsToIndex, Collections.EMPTY_MAP);
+ sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+ readAndValidateIndexDefn(colStatsIndexDefinition);
+
+ // drop col stats
+ metaClient.deleteIndexDefinition(colStatsIndexDefinition.getIndexName());
+ readAndValidateIndexDefnNotPresent(colStatsIndexDefinition,
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+ .build());
+ }
+
+ private void readAndValidateIndexDefn(HoodieIndexDefinition
expectedIndexDefn) {
+ HoodieTableMetaClient newMetaClient =
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+ .build();
+ readAndValidateIndexDefn(expectedIndexDefn, newMetaClient);
+ }
+
+ private void readAndValidateIndexDefn(HoodieIndexDefinition
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+ assertTrue(metaClient.getIndexMetadata().isPresent());
+
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+
assertTrue(metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+ assertEquals(expectedIndexDefn,
metaClient.getIndexMetadata().get().getIndexDefinitions().get(expectedIndexDefn.getIndexName()));
+ }
+
+ private void readAndValidateIndexDefnNotPresent(HoodieIndexDefinition
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+ assertTrue(metaClient.getIndexMetadata().isPresent());
+
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+ }
+
+ private HoodieIndexDefinition getHoodieIndexDefn(String indexName, String
indexType, String sourceField) {
Review Comment:
also, for other methods, let's rename `*HoodieIndexDefn*` to
`*IndexDefinition*`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]