codope commented on code in PR #12529:
URL: https://github.com/apache/hudi/pull/12529#discussion_r1899960031


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
     writeTableMetadata(table, instantTime, metadata);
     activeTimeline.saveAsComplete(false, 
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, 
commitActionType, instantTime),
         
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
+    // update cols to Index as applicable
+    HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+        (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(val1, val2) -> {
+          updateColumnsToIndexWithColStats(val1, val2);
+          return null;

Review Comment:
   Is there a better way than using `Function` and avoid returning null?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {

Review Comment:
   Let's rename the class to `HoodieColumnStatsIndexUtils`, and wherever using 
`*ColStats*` in class or method names can be renamed to `*ColumnStats*`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result, 
List<HoodieWriteStat> write
           
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
       LOG.info("Committed " + instantTime);
       result.setCommitMetadata(Option.of(metadata));
+      // update cols to Index as applicable
+      HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+          (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(val1, val2) -> {

Review Comment:
   ```suggestion
             (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(metaClient, columnsToIndex) -> {
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
     writeTableMetadata(table, instantTime, metadata);
     activeTimeline.saveAsComplete(false, 
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, 
commitActionType, instantTime),
         
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
+    // update cols to Index as applicable
+    HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,

Review Comment:
   Can you please remind me why we are updating the columns after writing data 
commit file? What if we moved this in `writeTableMetadata`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {
+
+  /**
+   * Updates the list of columns to index with col stats partition in MDT.
+   * @param dataTable {@link HoodieTable} of interest.
+   * @param config {@link HoodieWriteConfig} of interest.
+   * @param commitMetadata commit metadata of interest.
+   * @param updateColSatsFunc function to assist with updating columns to 
index.
+   */
+  @VisibleForTesting
+  public static void updateColsToIndex(HoodieTable dataTable, 
HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
+                                Functions.Function2<HoodieTableMetaClient, 
List<String>, Void> updateColSatsFunc) {
+    if (config.getMetadataConfig().isColumnStatsIndexEnabled()) {
+      dataTable.getMetaClient().reloadTableConfig();
+      if 
(dataTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {
+        try {
+          HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+              .setStorage(dataTable.getStorage())
+              
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath()))
+              .build();
+          HoodieInstant latestInstant = 
mdtMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant().get();
+          final HoodieCommitMetadata mdtCommitMetadata = 
mdtMetaClient.getTimelineLayout().getCommitMetadataSerDe().deserialize(
+              latestInstant,
+              
mdtMetaClient.getActiveTimeline().getInstantDetails(latestInstant).get(),
+              HoodieCommitMetadata.class);
+          if 
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {

Review Comment:
   if we move the call inside `writeTableMetadata`, do we still need to do all 
of this? We have metadata writer in that method right.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieSparkIndex.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.HoodieSparkIndexClient;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.hudi.index.expression.ExpressionIndexSparkFunctions.IDENTITY_FUNCTION;
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.RANGE_TYPE;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieSparkIndex extends HoodieClientTestBase {
+
+  @Test
+  public void testIndexCreateAndDrop() throws IOException {
+    HoodieWriteConfig cfg = getConfigBuilder().build();
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+
+    initMetaClient(HoodieTableType.COPY_ON_WRITE);
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String commitTime1 = "001";
+      client.startCommitWithTime(commitTime1);
+      List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+      JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
+      List<WriteStatus> statuses1 = client.upsert(writeRecords1, 
commitTime1).collect();
+      assertNoWriteErrors(statuses1);
+    }
+
+    HoodieSparkIndexClient sparkIndexClient = new HoodieSparkIndexClient(cfg, 
context);
+
+    // add 4 indices(2 SI and 2 EI)
+    String indexNamePrefix = "index";
+    String fieldNamePrefix = "field";
+    for (int i = 0; i < 5; i++) {
+      String indexName = indexNamePrefix + "_" + i;
+      String fieldName = fieldNamePrefix + "_" + i;
+      HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexName,
+          i % 2 == 0 ? PARTITION_NAME_SECONDARY_INDEX : 
PARTITION_NAME_EXPRESSION_INDEX, fieldName);
+      sparkIndexClient.register(metaClient, indexDefinition);
+      readAndValidateIndexDefn(indexDefinition);
+    }
+
+    // add col stats index.
+    HoodieIndexDefinition colStatsIndexDefinition = 
getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+        RANGE_TYPE, fieldNamePrefix + "_5");
+    sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+    readAndValidateIndexDefn(colStatsIndexDefinition);
+
+    // drop one among sec index and expression index.
+    metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_1", 
PARTITION_NAME_EXPRESSION_INDEX));
+    metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_2", 
PARTITION_NAME_SECONDARY_INDEX));
+
+    //validate rest.
+    HoodieTableMetaClient newMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+        .build();
+    HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexNamePrefix 
+ "_0", PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_0");
+    readAndValidateIndexDefn(indexDefinition, newMetaClient);
+    indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_3", 
PARTITION_NAME_EXPRESSION_INDEX, fieldNamePrefix + "_3");
+    readAndValidateIndexDefn(indexDefinition, newMetaClient);
+    indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_4", 
PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_4");
+    readAndValidateIndexDefn(indexDefinition, newMetaClient);
+    readAndValidateIndexDefn(colStatsIndexDefinition);
+
+    // update col stats w/ new set of cols
+    List<String> colsToIndex = IntStream.range(0, 10).mapToObj(number -> 
fieldNamePrefix + "_" + number).collect(Collectors.toList());
+    colStatsIndexDefinition = getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+        RANGE_TYPE, RANGE_TYPE, colsToIndex, Collections.EMPTY_MAP);
+    sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+    readAndValidateIndexDefn(colStatsIndexDefinition);
+
+    // drop col stats
+    metaClient.deleteIndexDefinition(colStatsIndexDefinition.getIndexName());
+    readAndValidateIndexDefnNotPresent(colStatsIndexDefinition, 
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+        .build());
+  }
+
+  private void readAndValidateIndexDefn(HoodieIndexDefinition 
expectedIndexDefn) {
+    HoodieTableMetaClient newMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+        .build();
+    readAndValidateIndexDefn(expectedIndexDefn, newMetaClient);
+  }
+
+  private void readAndValidateIndexDefn(HoodieIndexDefinition 
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+    assertTrue(metaClient.getIndexMetadata().isPresent());
+    
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+    
assertTrue(metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+    assertEquals(expectedIndexDefn, 
metaClient.getIndexMetadata().get().getIndexDefinitions().get(expectedIndexDefn.getIndexName()));
+  }
+
+  private void readAndValidateIndexDefnNotPresent(HoodieIndexDefinition 
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+    assertTrue(metaClient.getIndexMetadata().isPresent());
+    
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+    
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+  }
+
+  private HoodieIndexDefinition getHoodieIndexDefn(String indexName, String 
indexType, String sourceField) {

Review Comment:
   ```suggestion
     private HoodieIndexDefinition getIndexDefinition(String indexName, String 
indexType, String sourceField) {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1683,11 +1688,11 @@ private static Comparable<?> coerceToComparable(Schema 
schema, Object val) {
   }
 
   private static boolean isColumnTypeSupported(Schema schema, 
Option<HoodieRecordType> recordType) {
-    // if record type is set and if its AVRO, MAP is unsupported.
-    if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
+    // if record type is not set or if its AVRO, MAP is unsupported.
+    if (!recordType.isPresent() || recordType.get() == HoodieRecordType.AVRO) {

Review Comment:
   let's add a test for:
   a) nested field
   b) field configured to index in metadata config but its data type is not 
supported. In this case, I expect to either error out and have no update in 
index definition, or only update supported fields in the ndex definition.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -296,6 +297,12 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
     writeTableMetadata(table, instantTime, metadata);
     activeTimeline.saveAsComplete(false, 
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, 
commitActionType, instantTime),
         
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
+    // update cols to Index as applicable
+    HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+        (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(val1, val2) -> {

Review Comment:
   ```suggestion
           (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(metaClient, columnsToIndex) -> {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java:
##########
@@ -45,6 +45,8 @@ public interface HoodieExpressionIndex<S, T> extends 
Serializable {
   String DAYS_OPTION = "days";
   String FORMAT_OPTION = "format";
 
+  String RANGE_TYPE = "range";

Review Comment:
   Can't we use `column_stats` as index type? It is consistent with EI. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result, 
List<HoodieWriteStat> write
           
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
       LOG.info("Committed " + instantTime);
       result.setCommitMetadata(Option.of(metadata));
+      // update cols to Index as applicable
+      HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+          (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(val1, val2) -> {
+            updateColumnsToIndexWithColStats(val1, val2);
+            return null;
+          });
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime,
           e);
     }
   }
 
+  /**
+   * Updates the list of columns indexed with col stats index in Metadata 
table.
+   * @param metaClient instance of {@link HoodieTableMetaClient} of interest.
+   * @param columnsToIndex list of columns to index.
+   */
+  protected abstract void 
updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> 
columnsToIndex);

Review Comment:
   ```suggestion
     protected abstract void 
updateColumnsToIndexForColumnStats(HoodieTableMetaClient metaClient, 
List<String> columnsToIndex);
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result, 
List<HoodieWriteStat> write
           
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
       LOG.info("Committed " + instantTime);
       result.setCommitMetadata(Option.of(metadata));
+      // update cols to Index as applicable
+      HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+          (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(val1, val2) -> {
+            updateColumnsToIndexWithColStats(val1, val2);
+            return null;
+          });
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime,
           e);
     }
   }
 
+  /**
+   * Updates the list of columns indexed with col stats index in Metadata 
table.
+   * @param metaClient instance of {@link HoodieTableMetaClient} of interest.
+   * @param columnsToIndex list of columns to index.
+   */
+  protected abstract void 
updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> 
columnsToIndex);

Review Comment:
   The concrete impl of this API and that of 
`BaseHoodieClient.updateColumnsToIndexWithColStats` is same (at least for 
Spark). Do we still need this API? By the way, isn't colstats supported for 
Flink? 
   
   Regardless, I think engine specific methods for updating index definition 
does not fit well. Do we have such examples for updating table config?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1364,24 +1342,47 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
     }
   }
 
+  public static List<String> getColumnsToIndex(HoodieCommitMetadata 
commitMetadata,
+                                         HoodieTableMetaClient dataMetaClient,
+                                         HoodieMetadataConfig metadataConfig) {

Review Comment:
   nit: indentation



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -230,12 +233,25 @@ protected void commit(HoodieWriteMetadata<O> result, 
List<HoodieWriteStat> write
           
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), 
metadata));
       LOG.info("Committed " + instantTime);
       result.setCommitMetadata(Option.of(metadata));
+      // update cols to Index as applicable
+      HoodieColStatsIndexUtils.updateColsToIndex(table, config, metadata,
+          (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(val1, val2) -> {
+            updateColumnsToIndexWithColStats(val1, val2);
+            return null;

Review Comment:
   same here - if we can avoid returning null, it would be nice.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {
+
+  /**
+   * Updates the list of columns to index with col stats partition in MDT.
+   * @param dataTable {@link HoodieTable} of interest.
+   * @param config {@link HoodieWriteConfig} of interest.
+   * @param commitMetadata commit metadata of interest.
+   * @param updateColSatsFunc function to assist with updating columns to 
index.
+   */
+  @VisibleForTesting
+  public static void updateColsToIndex(HoodieTable dataTable, 
HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
+                                Functions.Function2<HoodieTableMetaClient, 
List<String>, Void> updateColSatsFunc) {
+    if (config.getMetadataConfig().isColumnStatsIndexEnabled()) {
+      dataTable.getMetaClient().reloadTableConfig();
+      if 
(dataTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {
+        try {
+          HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+              .setStorage(dataTable.getStorage())
+              
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath()))
+              .build();
+          HoodieInstant latestInstant = 
mdtMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant().get();
+          final HoodieCommitMetadata mdtCommitMetadata = 
mdtMetaClient.getTimelineLayout().getCommitMetadataSerDe().deserialize(
+              latestInstant,
+              
mdtMetaClient.getActiveTimeline().getInstantDetails(latestInstant).get(),
+              HoodieCommitMetadata.class);
+          if 
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {
+            // update data table's table config for list of columns indexed.
+            List<String> columnsToIndex = 
HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, 
dataTable.getMetaClient(), config.getMetadataConfig());
+            // if col stats is getting updated, lets also update list of 
columns indexed if changed.
+            updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
+          }
+        } catch (Exception e) {
+          throw new HoodieException("Updating data table config to latest set 
of columns indexed with col stats failed ", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Deletes col stats index definition for the given table of interest.
+   * @param dataTableMetaClient {@link HoodieTableMetaClient} instance for the 
data table.
+   */
+  @VisibleForTesting
+  public static void deleteColStatsIndexDefn(HoodieTableMetaClient 
dataTableMetaClient) {

Review Comment:
   ```suggestion
     public static void deleteColumnStatsIndexDefinition(HoodieTableMetaClient 
dataTableMetaClient) {
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColStatsIndexUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+
+/**
+ * Utils to assist with updating columns to index with col stats.
+ */
+public class HoodieColStatsIndexUtils {

Review Comment:
   also, wdyt about moving it to hudi-common? it's like table config update.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieSparkIndex.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.HoodieSparkIndexClient;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.hudi.index.expression.ExpressionIndexSparkFunctions.IDENTITY_FUNCTION;
+import static 
org.apache.hudi.index.expression.HoodieExpressionIndex.RANGE_TYPE;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieSparkIndex extends HoodieClientTestBase {
+
+  @Test
+  public void testIndexCreateAndDrop() throws IOException {
+    HoodieWriteConfig cfg = getConfigBuilder().build();
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+
+    initMetaClient(HoodieTableType.COPY_ON_WRITE);
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String commitTime1 = "001";
+      client.startCommitWithTime(commitTime1);
+      List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+      JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
+      List<WriteStatus> statuses1 = client.upsert(writeRecords1, 
commitTime1).collect();
+      assertNoWriteErrors(statuses1);
+    }
+
+    HoodieSparkIndexClient sparkIndexClient = new HoodieSparkIndexClient(cfg, 
context);
+
+    // add 4 indices(2 SI and 2 EI)
+    String indexNamePrefix = "index";
+    String fieldNamePrefix = "field";
+    for (int i = 0; i < 5; i++) {
+      String indexName = indexNamePrefix + "_" + i;
+      String fieldName = fieldNamePrefix + "_" + i;
+      HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexName,
+          i % 2 == 0 ? PARTITION_NAME_SECONDARY_INDEX : 
PARTITION_NAME_EXPRESSION_INDEX, fieldName);
+      sparkIndexClient.register(metaClient, indexDefinition);
+      readAndValidateIndexDefn(indexDefinition);
+    }
+
+    // add col stats index.
+    HoodieIndexDefinition colStatsIndexDefinition = 
getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+        RANGE_TYPE, fieldNamePrefix + "_5");
+    sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+    readAndValidateIndexDefn(colStatsIndexDefinition);
+
+    // drop one among sec index and expression index.
+    metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_1", 
PARTITION_NAME_EXPRESSION_INDEX));
+    metaClient.deleteIndexDefinition(getIndexFullName(indexNamePrefix + "_2", 
PARTITION_NAME_SECONDARY_INDEX));
+
+    //validate rest.
+    HoodieTableMetaClient newMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+        .build();
+    HoodieIndexDefinition indexDefinition = getHoodieIndexDefn(indexNamePrefix 
+ "_0", PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_0");
+    readAndValidateIndexDefn(indexDefinition, newMetaClient);
+    indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_3", 
PARTITION_NAME_EXPRESSION_INDEX, fieldNamePrefix + "_3");
+    readAndValidateIndexDefn(indexDefinition, newMetaClient);
+    indexDefinition = getHoodieIndexDefn(indexNamePrefix + "_4", 
PARTITION_NAME_SECONDARY_INDEX, fieldNamePrefix + "_4");
+    readAndValidateIndexDefn(indexDefinition, newMetaClient);
+    readAndValidateIndexDefn(colStatsIndexDefinition);
+
+    // update col stats w/ new set of cols
+    List<String> colsToIndex = IntStream.range(0, 10).mapToObj(number -> 
fieldNamePrefix + "_" + number).collect(Collectors.toList());
+    colStatsIndexDefinition = getHoodieIndexDefn(PARTITION_NAME_COLUMN_STATS,
+        RANGE_TYPE, RANGE_TYPE, colsToIndex, Collections.EMPTY_MAP);
+    sparkIndexClient.register(metaClient, colStatsIndexDefinition);
+    readAndValidateIndexDefn(colStatsIndexDefinition);
+
+    // drop col stats
+    metaClient.deleteIndexDefinition(colStatsIndexDefinition.getIndexName());
+    readAndValidateIndexDefnNotPresent(colStatsIndexDefinition, 
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+        .build());
+  }
+
+  private void readAndValidateIndexDefn(HoodieIndexDefinition 
expectedIndexDefn) {
+    HoodieTableMetaClient newMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getBasePath()).setConf(metaClient.getStorageConf())
+        .build();
+    readAndValidateIndexDefn(expectedIndexDefn, newMetaClient);
+  }
+
+  private void readAndValidateIndexDefn(HoodieIndexDefinition 
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+    assertTrue(metaClient.getIndexMetadata().isPresent());
+    
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+    
assertTrue(metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+    assertEquals(expectedIndexDefn, 
metaClient.getIndexMetadata().get().getIndexDefinitions().get(expectedIndexDefn.getIndexName()));
+  }
+
+  private void readAndValidateIndexDefnNotPresent(HoodieIndexDefinition 
expectedIndexDefn, HoodieTableMetaClient metaClient) {
+    assertTrue(metaClient.getIndexMetadata().isPresent());
+    
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().isEmpty());
+    
assertTrue(!metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(expectedIndexDefn.getIndexName()));
+  }
+
+  private HoodieIndexDefinition getHoodieIndexDefn(String indexName, String 
indexType, String sourceField) {

Review Comment:
   also, for other methods, let's rename `*HoodieIndexDefn*` to 
`*IndexDefinition*`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to