This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46d2a30dffe [HUDI-7958] Create partition stats index for all columns 
when no cols specified (#11579)
46d2a30dffe is described below

commit 46d2a30dffed6db5d8f02bfe830ee77c3623c3d7
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Sep 6 06:22:21 2024 +0530

    [HUDI-7958] Create partition stats index for all columns when no cols 
specified (#11579)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   4 -
 .../hudi/common/config/HoodieMetadataConfig.java   |   2 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |  16 +--
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   8 +-
 .../hudi/metadata/MetadataPartitionType.java       |  22 ++--
 .../sink/TestStreamWriteOperatorCoordinator.java   |  43 +++++---
 .../TestPartitionStatsIndexWithSql.scala           | 112 +++++++++++++++------
 .../hudi/dml/TestHoodieTableValuedFunction.scala   |  27 +++--
 8 files changed, 163 insertions(+), 71 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 178c1facf22..e9657e7145d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -406,10 +406,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
             fileGroupCountAndRecordsPair = 
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
             break;
           case PARTITION_STATS:
-            if 
(dataWriteConfig.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
-              LOG.warn("Skipping partition stats index initialization as 
target columns are not set");
-              continue;
-            }
             fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(partitionInfoList);
             break;
           case SECONDARY_INDEX:
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 7834a48f674..40e2ab94d41 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -332,7 +332,7 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
 
   public static final ConfigProperty<Boolean> 
ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty
       .key(METADATA_PREFIX + ".index.partition.stats.enable")
-      .defaultValue(true)
+      .defaultValue(false)
       .sinceVersion("1.0.0")
       .withDocumentation("Enable aggregating stats for each column at the 
storage partition level.");
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index e4a3420aeba..d86f6c101ee 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -212,8 +212,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, 
metadataBloomFilter, null, null, null);
   }
 
-  protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats 
columnStats) {
-    this(key, MetadataPartitionType.COLUMN_STATS.getRecordType(), null, null, 
columnStats, null, null);
+  protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats 
columnStats, int recordType) {
+    this(key, recordType, null, null, columnStats, null, null);
   }
 
   private HoodieMetadataPayload(String key, HoodieRecordIndexInfo 
recordIndexMetadata) {
@@ -482,7 +482,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
       HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName, 
columnRangeMetadata),
           MetadataPartitionType.COLUMN_STATS.getPartitionPath());
 
-      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(),
+      HoodieMetadataPayload payload = new HoodieMetadataPayload(
+          key.getRecordKey(),
           HoodieMetadataColumnStats.newBuilder()
               .setFileName(new 
StoragePath(columnRangeMetadata.getFilePath()).getName())
               .setColumnName(columnRangeMetadata.getColumnName())
@@ -493,7 +494,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
               .setTotalSize(columnRangeMetadata.getTotalSize())
               
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
               .setIsDeleted(isDeleted)
-              .build());
+              .build(),
+          MetadataPartitionType.COLUMN_STATS.getRecordType());
 
       return new HoodieAvroRecord<>(key, payload);
     });
@@ -505,7 +507,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
       HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath, 
columnRangeMetadata.getColumnName()),
           MetadataPartitionType.PARTITION_STATS.getPartitionPath());
-      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(),
+      HoodieMetadataPayload payload = new HoodieMetadataPayload(
+          key.getRecordKey(),
           HoodieMetadataColumnStats.newBuilder()
               .setFileName(columnRangeMetadata.getFilePath())
               .setColumnName(columnRangeMetadata.getColumnName())
@@ -516,7 +519,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
               .setTotalSize(columnRangeMetadata.getTotalSize())
               
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
               .setIsDeleted(isDeleted)
-              .build());
+              .build(),
+          MetadataPartitionType.PARTITION_STATS.getRecordType());
 
       return new HoodieAvroRecord<>(key, payload);
     });
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 54b017dde7a..8ec48326552 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -716,6 +716,7 @@ public class HoodieTableMetadataUtil {
 
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
+      LOG.warn("No columns to index for column stats index.");
       return engineContext.emptyHoodieData();
     }
 
@@ -932,6 +933,7 @@ public class HoodieTableMetadataUtil {
             Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
+      LOG.warn("No columns to index for column stats index.");
       return engineContext.emptyHoodieData();
     }
 
@@ -2046,8 +2048,12 @@ public class HoodieTableMetadataUtil {
                                                                              
List<DirectoryInfo> partitionInfoList,
                                                                              
HoodieMetadataConfig metadataConfig,
                                                                              
HoodieTableMetaClient dataTableMetaClient) {
-    final List<String> columnsToIndex = 
metadataConfig.getColumnsEnabledForColumnStatsIndex();
+    final List<String> columnsToIndex = getColumnsToIndex(
+        metadataConfig.isPartitionStatsIndexEnabled(),
+        metadataConfig.getColumnsEnabledForColumnStatsIndex(),
+        Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
     if (columnsToIndex.isEmpty()) {
+      LOG.warn("No columns to index for partition stats index");
       return engineContext.emptyHoodieData();
     }
     LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndex);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 9c2df411b23..80ec142ea77 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -39,16 +39,12 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static 
org.apache.hudi.common.config.HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS;
 import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
-import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
-import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
-import static org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -123,7 +119,7 @@ public enum MetadataPartitionType {
       HoodieMetadataColumnStats previousColStatsRecord = 
older.getColumnStatMetadata().get();
       HoodieMetadataColumnStats newColumnStatsRecord = 
newer.getColumnStatMetadata().get();
 
-      return new HoodieMetadataPayload(newer.key, 
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord));
+      return new HoodieMetadataPayload(newer.key, 
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord), 
getRecordType());
     }
   },
   BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS, 
"bloom-filters-", 4) {
@@ -235,13 +231,24 @@ public enum MetadataPartitionType {
   PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, 
"partition-stats-", 6) {
     @Override
     public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
-      return getBooleanWithAltKeys(writeConfig, 
ENABLE_METADATA_INDEX_PARTITION_STATS) && 
nonEmpty(getStringWithAltKeys(writeConfig, COLUMN_STATS_INDEX_FOR_COLUMNS, 
EMPTY_STRING));
+      return getBooleanWithAltKeys(writeConfig, 
ENABLE_METADATA_INDEX_PARTITION_STATS);
     }
 
     @Override
     public void constructMetadataPayload(HoodieMetadataPayload payload, 
GenericRecord record) {
       constructColumnStatsMetadataPayload(payload, record);
     }
+
+    @Override
+    public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload 
older, HoodieMetadataPayload newer) {
+      checkArgument(older.getColumnStatMetadata().isPresent());
+      checkArgument(newer.getColumnStatMetadata().isPresent());
+
+      HoodieMetadataColumnStats previousColStatsRecord = 
older.getColumnStatMetadata().get();
+      HoodieMetadataColumnStats newColumnStatsRecord = 
newer.getColumnStatMetadata().get();
+
+      return new HoodieMetadataPayload(newer.key, 
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord), 
getRecordType());
+    }
   },
   // ALL_PARTITIONS is just another record type in FILES partition
   ALL_PARTITIONS(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 1) {
@@ -411,6 +418,9 @@ public enum MetadataPartitionType {
    * Returns the list of metadata partition types enabled based on the 
metadata config and table config.
    */
   public static List<MetadataPartitionType> 
getEnabledPartitions(TypedProperties writeConfig, HoodieTableMetaClient 
metaClient) {
+    if (!getBooleanWithAltKeys(writeConfig, ENABLE)) {
+      return Collections.emptyList();
+    }
     return Arrays.stream(getValidValues())
         .filter(partitionType -> 
partitionType.isMetadataPartitionEnabled(writeConfig) || 
partitionType.isMetadataPartitionAvailable(metaClient))
         .collect(Collectors.toList());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 143bfe9e22c..a9bfcb02ba6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -315,8 +315,9 @@ public class TestStreamWriteOperatorCoordinator {
     reset();
     // override the default configuration
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    int metadataCompactionDeltaCommits = 5;
     conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
-    conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
+    conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 
metadataCompactionDeltaCommits);
     OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), 1);
     coordinator = new StreamWriteOperatorCoordinator(conf, context);
     coordinator.start();
@@ -332,23 +333,27 @@ public class TestStreamWriteOperatorCoordinator {
     final String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
     HoodieTableMetaClient metadataTableMetaClient = 
HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), 
metadataTableBasePath);
     HoodieTimeline completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(1));
+    HoodieTableMetaClient dataTableMetaClient =
+        HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new 
Path(metadataTableBasePath).getParent().getParent().toString());
+    int metadataPartitions = 
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+    assertThat("Instants needed to sync to metadata table do not match", 
completedTimeline.countInstants(), is(metadataPartitions));
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
 
     // test metadata table compaction
-    // write another 4 commits
-    for (int i = 1; i < 5; i++) {
+    // write few more commits until compaction
+    int numCommits;
+    for (numCommits = metadataPartitions; numCommits < 
metadataCompactionDeltaCommits; numCommits++) {
       instant = mockWriteWithMetadata();
       metadataTableMetaClient.reloadActiveTimeline();
       completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-      assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(i + 1));
+      assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(numCommits + 1));
       assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant));
     }
     // the 5th commit triggers the compaction
     mockWriteWithMetadata();
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(7));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(numCommits + 2));
     assertThat(completedTimeline.nthFromLastInstant(0).get().getAction(), 
is(HoodieTimeline.COMMIT_ACTION));
     // write another 2 commits
     for (int i = 7; i < 8; i++) {
@@ -401,24 +406,28 @@ public class TestStreamWriteOperatorCoordinator {
     final String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
     HoodieTableMetaClient metadataTableMetaClient = 
HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), 
metadataTableBasePath);
     HoodieTimeline completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(1));
+    HoodieTableMetaClient dataTableMetaClient =
+        HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new 
Path(metadataTableBasePath).getParent().getParent().toString());
+    int metadataPartitions = 
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+    assertThat("Instants needed to sync to metadata table do not match", 
completedTimeline.countInstants(), is(metadataPartitions));
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
 
     // test metadata table log compaction
     // already 1 commit is used to initialized FILES partition in MDT
     // write another 4 commits
-    for (int i = 1; i < 5; i++) {
+    int numCommits;
+    for (numCommits = metadataPartitions; numCommits < metadataPartitions + 4; 
numCommits++) {
       instant = mockWriteWithMetadata();
       metadataTableMetaClient.reloadActiveTimeline();
       completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-      assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(i + 1));
+      assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(numCommits + 1));
       assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant));
     }
     // the 5th commit triggers the log compaction
     mockWriteWithMetadata();
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(7));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(numCommits + 2));
     assertThat("The log compaction instant time should be new generated",
         completedTimeline.nthFromLastInstant(1).get().getTimestamp(), 
not(instant));
     // log compaction is another delta commit
@@ -447,7 +456,10 @@ public class TestStreamWriteOperatorCoordinator {
     final String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
     HoodieTableMetaClient metadataTableMetaClient = 
HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), 
metadataTableBasePath);
     HoodieTimeline completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(1));
+    HoodieTableMetaClient dataTableMetaClient =
+        HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new 
Path(metadataTableBasePath).getParent().getParent().toString());
+    int metadataPartitions = 
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+    assertThat("Instants needed to sync to metadata table do not match", 
completedTimeline.countInstants(), is(metadataPartitions));
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
 
     // writes a normal commit
@@ -464,7 +476,7 @@ public class TestStreamWriteOperatorCoordinator {
     metadataTableMetaClient.reloadActiveTimeline();
 
     completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(4));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(metadataPartitions + 3));
     assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), 
is(instant));
     assertThat("The pending instant should be rolled back first",
         completedTimeline.lastInstant().get().getAction(), 
is(HoodieTimeline.ROLLBACK_ACTION));
@@ -530,13 +542,16 @@ public class TestStreamWriteOperatorCoordinator {
     final String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
     HoodieTableMetaClient metadataTableMetaClient = 
HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), 
metadataTableBasePath);
     HoodieTimeline completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(1));
+    HoodieTableMetaClient dataTableMetaClient =
+        HoodieTestUtils.createMetaClient(new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new 
Path(metadataTableBasePath).getParent().getParent().toString());
+    int metadataPartitions = 
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+    assertThat("Instants needed to sync to metadata table do not match", 
completedTimeline.countInstants(), is(metadataPartitions));
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
 
     instant = mockWriteWithMetadata();
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(2));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(metadataPartitions + 1));
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant));
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index c8f9029889f..72cc75f6799 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -24,11 +24,11 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.{FileSlice, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
 import org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS
+import org.apache.hudi.metadata.{HoodieMetadataFileSystemView, 
MetadataPartitionType}
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
-
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GreaterThan, LessThan, Literal}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
@@ -261,43 +261,95 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
              |""".stripMargin
         )
 
+        writeAndValidatePartitionStats(tableName, tablePath)
+      }
+    }
+  }
+
+  test(s"Test partition stats index without configuring columns to index") {
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        // create table and enable partition stats without configuring columns 
to index
         spark.sql(
           s"""
-             | insert into $tableName
-             | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3', 
3000, 30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
-             | """.stripMargin
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price int,
+             |  ts long
+             |) using hudi
+             |partitioned by (ts)
+             |tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'price',
+             |  hoodie.metadata.index.partition.stats.enable = 'true'
+             |)
+             |location '$tablePath'
+             |""".stripMargin
         )
 
-        // Validate partition_stats index exists
-        val metaClient = HoodieTableMetaClient.builder()
-          .setBasePath(tablePath)
-          .setConf(HoodieTestUtils.getDefaultStorageConf)
-          .build()
-        assertResult(tableName)(metaClient.getTableConfig.getTableName)
-        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
-
-        spark.sql("set hoodie.metadata.enable=true")
-        spark.sql("set hoodie.enable.data.skipping=true")
-        spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
-        checkAnswer(s"select id, name, price, ts from $tableName where 
price>3000")(
-          Seq(6, "a6", 4000, 30)
+        writeAndValidatePartitionStats(tableName, tablePath)
+        // validate partition stats index for id column
+        checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') 
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='id'")(
+          Seq(getPartitionStatsIndexKey("ts=10", "id"), 1, 4),
+          Seq(getPartitionStatsIndexKey("ts=20", "id"), 2, 5),
+          Seq(getPartitionStatsIndexKey("ts=30", "id"), 3, 6)
         )
-
-        // Test price update, assert latest value and ensure file pruning
-        spark.sql(s"update $tableName set price = price + 1 where id = 6")
-        checkAnswer(s"select id, name, price, ts from $tableName where 
price>3000")(
-          Seq(6, "a6", 4001, 30)
+        // validate partition stats index for name column
+        checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value, 
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') 
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='name'")(
+          Seq(getPartitionStatsIndexKey("ts=10", "name"), "a1", "a4"),
+          Seq(getPartitionStatsIndexKey("ts=20", "name"), "a2", "a5"),
+          Seq(getPartitionStatsIndexKey("ts=30", "name"), "a3", "a6")
         )
-
-        verifyFilePruning(
-          Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", 
HoodieMetadataConfig.ENABLE.key -> "true"),
-          GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)),
-          HoodieTableMetaClient.reload(metaClient),
-          isDataSkippingExpected = true)
       }
     }
   }
 
+  private def writeAndValidatePartitionStats(tableName: String, tablePath: 
String): Unit = {
+    spark.sql(
+      s"""
+         | insert into $tableName
+         | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3', 3000, 
30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
+         | """.stripMargin
+    )
+
+    // Validate partition_stats index exists
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(tablePath)
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .build()
+    assertResult(tableName)(metaClient.getTableConfig.getTableName)
+    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+    spark.sql("set hoodie.metadata.enable=true")
+    spark.sql("set hoodie.enable.data.skipping=true")
+    spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+    checkAnswer(s"select id, name, price, ts from $tableName where 
price>3000")(
+      Seq(6, "a6", 4000, 30)
+    )
+
+    // Test price update, assert latest value and ensure file pruning
+    spark.sql(s"update $tableName set price = price + 1 where id = 6")
+    checkAnswer(s"select id, name, price, ts from $tableName where 
price>3000")(
+      Seq(6, "a6", 4001, 30)
+    )
+
+    verifyFilePruning(
+      Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", 
HoodieMetadataConfig.ENABLE.key -> "true"),
+      GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)),
+      HoodieTableMetaClient.reload(metaClient),
+      isDataSkippingExpected = true)
+
+    verifyFilePruning(
+      Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false", 
HoodieMetadataConfig.ENABLE.key -> "true"),
+      GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)),
+      HoodieTableMetaClient.reload(metaClient),
+      isDataSkippingExpected = false)
+  }
+
   private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean, 
isNoScanExpected: Boolean = false): Unit = {
     // with data skipping
     val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
@@ -310,6 +362,8 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
         assertTrue(filteredFilesCount < latestDataFilesCount)
         if (isNoScanExpected) {
           assertTrue(filteredFilesCount == 0)
+        } else {
+          assertTrue(filteredFilesCount > 0)
         }
       } else {
         assertTrue(filteredFilesCount == latestDataFilesCount)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
index e41d05138d5..1cef283eaa2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.dml
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
 import org.apache.hudi.HoodieSparkUtils
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
+import org.apache.hudi.metadata.MetadataPartitionType
 import org.apache.spark.sql.functions.{col, from_json}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 
@@ -599,22 +600,22 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
           )
 
           val result2DF = spark.sql(
-            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where type=1"
+            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.ALL_PARTITIONS.getRecordType}"
           )
           assert(result2DF.count() == 1)
 
           val result3DF = spark.sql(
-            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where type=2"
+            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.FILES.getRecordType}"
           )
           assert(result3DF.count() == 3)
 
           val result4DF = spark.sql(
-            s"select type, key, ColumnStatsMetadata from 
hudi_metadata('$identifier') where type=3 or type=6"
+            s"select type, key, ColumnStatsMetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.COLUMN_STATS.getRecordType}"
           )
-          assert(result4DF.count() == 6)
+          assert(result4DF.count() == 3)
 
           val result5DF = spark.sql(
-            s"select type, key, recordIndexMetadata from 
hudi_metadata('$identifier') where type=5"
+            s"select type, key, recordIndexMetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.RECORD_INDEX.getRecordType}"
           )
           assert(result5DF.count() == 3)
 
@@ -622,6 +623,12 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
             s"select type, key, BloomFilterMetadata from 
hudi_metadata('$identifier') where BloomFilterMetadata is not null"
           )
           assert(result6DF.count() == 0)
+
+          // no partition stats by default
+          val result7DF = spark.sql(
+            s"select type, key, ColumnStatsMetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}"
+          )
+          assert(result7DF.count() == 0)
         }
       }
     }
@@ -664,25 +671,25 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
           )
 
           val result2DF = spark.sql(
-            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where type=1"
+            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.ALL_PARTITIONS.getRecordType}"
           )
           assert(result2DF.count() == 1)
 
           val result3DF = spark.sql(
-            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where type=2"
+            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where 
type=${MetadataPartitionType.FILES.getRecordType}"
           )
           assert(result3DF.count() == 3)
 
           val result4DF = spark.sql(
-            s"select * from hudi_metadata('$identifier') where type=3"
+            s"select * from hudi_metadata('$identifier') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}"
           )
           assert(result4DF.count() == 3)
-          checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value 
from hudi_metadata('$identifier') where type=3")(
+          checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value 
from hudi_metadata('$identifier') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
             Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000),
             Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000),
             Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000)
           )
-          checkAnswer(s"select key, ColumnStatsMetadata.maxValue.member1.value 
from hudi_metadata('$identifier') where type=3")(
+          checkAnswer(s"select key, ColumnStatsMetadata.maxValue.member1.value 
from hudi_metadata('$identifier') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
             Seq(getPartitionStatsIndexKey("ts=10", "price"), 2000),
             Seq(getPartitionStatsIndexKey("ts=20", "price"), 3000),
             Seq(getPartitionStatsIndexKey("ts=30", "price"), 4000)


Reply via email to