This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 46d2a30dffe [HUDI-7958] Create partition stats index for all columns
when no cols specified (#11579)
46d2a30dffe is described below
commit 46d2a30dffed6db5d8f02bfe830ee77c3623c3d7
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Sep 6 06:22:21 2024 +0530
[HUDI-7958] Create partition stats index for all columns when no cols
specified (#11579)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 4 -
.../hudi/common/config/HoodieMetadataConfig.java | 2 +-
.../hudi/metadata/HoodieMetadataPayload.java | 16 +--
.../hudi/metadata/HoodieTableMetadataUtil.java | 8 +-
.../hudi/metadata/MetadataPartitionType.java | 22 ++--
.../sink/TestStreamWriteOperatorCoordinator.java | 43 +++++---
.../TestPartitionStatsIndexWithSql.scala | 112 +++++++++++++++------
.../hudi/dml/TestHoodieTableValuedFunction.scala | 27 +++--
8 files changed, 163 insertions(+), 71 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 178c1facf22..e9657e7145d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -406,10 +406,6 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
fileGroupCountAndRecordsPair =
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
break;
case PARTITION_STATS:
- if
(dataWriteConfig.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
- LOG.warn("Skipping partition stats index initialization as
target columns are not set");
- continue;
- }
fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(partitionInfoList);
break;
case SECONDARY_INDEX:
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 7834a48f674..40e2ab94d41 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -332,7 +332,7 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
public static final ConfigProperty<Boolean>
ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty
.key(METADATA_PREFIX + ".index.partition.stats.enable")
- .defaultValue(true)
+ .defaultValue(false)
.sinceVersion("1.0.0")
.withDocumentation("Enable aggregating stats for each column at the
storage partition level.");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index e4a3420aeba..d86f6c101ee 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -212,8 +212,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null,
metadataBloomFilter, null, null, null);
}
- protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats
columnStats) {
- this(key, MetadataPartitionType.COLUMN_STATS.getRecordType(), null, null,
columnStats, null, null);
+ protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats
columnStats, int recordType) {
+ this(key, recordType, null, null, columnStats, null, null);
}
private HoodieMetadataPayload(String key, HoodieRecordIndexInfo
recordIndexMetadata) {
@@ -482,7 +482,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName,
columnRangeMetadata),
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(),
+ HoodieMetadataPayload payload = new HoodieMetadataPayload(
+ key.getRecordKey(),
HoodieMetadataColumnStats.newBuilder()
.setFileName(new
StoragePath(columnRangeMetadata.getFilePath()).getName())
.setColumnName(columnRangeMetadata.getColumnName())
@@ -493,7 +494,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
.setTotalSize(columnRangeMetadata.getTotalSize())
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
.setIsDeleted(isDeleted)
- .build());
+ .build(),
+ MetadataPartitionType.COLUMN_STATS.getRecordType());
return new HoodieAvroRecord<>(key, payload);
});
@@ -505,7 +507,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath,
columnRangeMetadata.getColumnName()),
MetadataPartitionType.PARTITION_STATS.getPartitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(),
+ HoodieMetadataPayload payload = new HoodieMetadataPayload(
+ key.getRecordKey(),
HoodieMetadataColumnStats.newBuilder()
.setFileName(columnRangeMetadata.getFilePath())
.setColumnName(columnRangeMetadata.getColumnName())
@@ -516,7 +519,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
.setTotalSize(columnRangeMetadata.getTotalSize())
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
.setIsDeleted(isDeleted)
- .build());
+ .build(),
+ MetadataPartitionType.PARTITION_STATS.getRecordType());
return new HoodieAvroRecord<>(key, payload);
});
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 54b017dde7a..8ec48326552 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -716,6 +716,7 @@ public class HoodieTableMetadataUtil {
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
+ LOG.warn("No columns to index for column stats index.");
return engineContext.emptyHoodieData();
}
@@ -932,6 +933,7 @@ public class HoodieTableMetadataUtil {
Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
+ LOG.warn("No columns to index for column stats index.");
return engineContext.emptyHoodieData();
}
@@ -2046,8 +2048,12 @@ public class HoodieTableMetadataUtil {
List<DirectoryInfo> partitionInfoList,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient) {
- final List<String> columnsToIndex =
metadataConfig.getColumnsEnabledForColumnStatsIndex();
+ final List<String> columnsToIndex = getColumnsToIndex(
+ metadataConfig.isPartitionStatsIndexEnabled(),
+ metadataConfig.getColumnsEnabledForColumnStatsIndex(),
+ Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
if (columnsToIndex.isEmpty()) {
+ LOG.warn("No columns to index for partition stats index");
return engineContext.emptyHoodieData();
}
LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndex);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 9c2df411b23..80ec142ea77 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -39,16 +39,12 @@ import java.util.stream.Collectors;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static
org.apache.hudi.common.config.HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
-import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
-import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
-import static org.apache.hudi.common.util.StringUtils.nonEmpty;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -123,7 +119,7 @@ public enum MetadataPartitionType {
HoodieMetadataColumnStats previousColStatsRecord =
older.getColumnStatMetadata().get();
HoodieMetadataColumnStats newColumnStatsRecord =
newer.getColumnStatMetadata().get();
- return new HoodieMetadataPayload(newer.key,
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord));
+ return new HoodieMetadataPayload(newer.key,
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord),
getRecordType());
}
},
BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS,
"bloom-filters-", 4) {
@@ -235,13 +231,24 @@ public enum MetadataPartitionType {
PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS,
"partition-stats-", 6) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
- return getBooleanWithAltKeys(writeConfig,
ENABLE_METADATA_INDEX_PARTITION_STATS) &&
nonEmpty(getStringWithAltKeys(writeConfig, COLUMN_STATS_INDEX_FOR_COLUMNS,
EMPTY_STRING));
+ return getBooleanWithAltKeys(writeConfig,
ENABLE_METADATA_INDEX_PARTITION_STATS);
}
@Override
public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
constructColumnStatsMetadataPayload(payload, record);
}
+
+ @Override
+ public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload
older, HoodieMetadataPayload newer) {
+ checkArgument(older.getColumnStatMetadata().isPresent());
+ checkArgument(newer.getColumnStatMetadata().isPresent());
+
+ HoodieMetadataColumnStats previousColStatsRecord =
older.getColumnStatMetadata().get();
+ HoodieMetadataColumnStats newColumnStatsRecord =
newer.getColumnStatMetadata().get();
+
+ return new HoodieMetadataPayload(newer.key,
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord),
getRecordType());
+ }
},
// ALL_PARTITIONS is just another record type in FILES partition
ALL_PARTITIONS(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 1) {
@@ -411,6 +418,9 @@ public enum MetadataPartitionType {
* Returns the list of metadata partition types enabled based on the
metadata config and table config.
*/
public static List<MetadataPartitionType>
getEnabledPartitions(TypedProperties writeConfig, HoodieTableMetaClient
metaClient) {
+ if (!getBooleanWithAltKeys(writeConfig, ENABLE)) {
+ return Collections.emptyList();
+ }
return Arrays.stream(getValidValues())
.filter(partitionType ->
partitionType.isMetadataPartitionEnabled(writeConfig) ||
partitionType.isMetadataPartitionAvailable(metaClient))
.collect(Collectors.toList());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 143bfe9e22c..a9bfcb02ba6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -315,8 +315,9 @@ public class TestStreamWriteOperatorCoordinator {
reset();
// override the default configuration
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ int metadataCompactionDeltaCommits = 5;
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
- conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
+ conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS,
metadataCompactionDeltaCommits);
OperatorCoordinator.Context context = new
MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
@@ -332,23 +333,27 @@ public class TestStreamWriteOperatorCoordinator {
final String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient =
HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)),
metadataTableBasePath);
HoodieTimeline completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(1));
+ HoodieTableMetaClient dataTableMetaClient =
+ HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new
Path(metadataTableBasePath).getParent().getParent().toString());
+ int metadataPartitions =
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+ assertThat("Instants needed to sync to metadata table do not match",
completedTimeline.countInstants(), is(metadataPartitions));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// test metadata table compaction
- // write another 4 commits
- for (int i = 1; i < 5; i++) {
+ // write few more commits until compaction
+ int numCommits;
+ for (numCommits = metadataPartitions; numCommits <
metadataCompactionDeltaCommits; numCommits++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(i + 1));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(numCommits + 1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
is(instant));
}
// the 5th commit triggers the compaction
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(7));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(numCommits + 2));
assertThat(completedTimeline.nthFromLastInstant(0).get().getAction(),
is(HoodieTimeline.COMMIT_ACTION));
// write another 2 commits
for (int i = 7; i < 8; i++) {
@@ -401,24 +406,28 @@ public class TestStreamWriteOperatorCoordinator {
final String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient =
HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)),
metadataTableBasePath);
HoodieTimeline completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(1));
+ HoodieTableMetaClient dataTableMetaClient =
+ HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new
Path(metadataTableBasePath).getParent().getParent().toString());
+ int metadataPartitions =
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+ assertThat("Instants needed to sync to metadata table do not match",
completedTimeline.countInstants(), is(metadataPartitions));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// test metadata table log compaction
// already 1 commit is used to initialized FILES partition in MDT
// write another 4 commits
- for (int i = 1; i < 5; i++) {
+ int numCommits;
+ for (numCommits = metadataPartitions; numCommits < metadataPartitions + 4;
numCommits++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(i + 1));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(numCommits + 1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
is(instant));
}
// the 5th commit triggers the log compaction
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(7));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(numCommits + 2));
assertThat("The log compaction instant time should be new generated",
completedTimeline.nthFromLastInstant(1).get().getTimestamp(),
not(instant));
// log compaction is another delta commit
@@ -447,7 +456,10 @@ public class TestStreamWriteOperatorCoordinator {
final String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient =
HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)),
metadataTableBasePath);
HoodieTimeline completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(1));
+ HoodieTableMetaClient dataTableMetaClient =
+ HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new
Path(metadataTableBasePath).getParent().getParent().toString());
+ int metadataPartitions =
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+ assertThat("Instants needed to sync to metadata table do not match",
completedTimeline.countInstants(), is(metadataPartitions));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// writes a normal commit
@@ -464,7 +476,7 @@ public class TestStreamWriteOperatorCoordinator {
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(4));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(metadataPartitions + 3));
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(),
is(instant));
assertThat("The pending instant should be rolled back first",
completedTimeline.lastInstant().get().getAction(),
is(HoodieTimeline.ROLLBACK_ACTION));
@@ -530,13 +542,16 @@ public class TestStreamWriteOperatorCoordinator {
final String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient =
HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)),
metadataTableBasePath);
HoodieTimeline completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(1));
+ HoodieTableMetaClient dataTableMetaClient =
+ HoodieTestUtils.createMetaClient(new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new
Path(metadataTableBasePath).getParent().getParent().toString());
+ int metadataPartitions =
dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
+ assertThat("Instants needed to sync to metadata table do not match",
completedTimeline.countInstants(), is(metadataPartitions));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(2));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(metadataPartitions + 1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
is(instant));
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index c8f9029889f..72cc75f6799 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -24,11 +24,11 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{FileSlice, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
import org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS
+import org.apache.hudi.metadata.{HoodieMetadataFileSystemView,
MetadataPartitionType}
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
-
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, GreaterThan, LessThan, Literal}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
@@ -261,43 +261,95 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
|""".stripMargin
)
+ writeAndValidatePartitionStats(tableName, tablePath)
+ }
+ }
+ }
+
+ test(s"Test partition stats index without configuring columns to index") {
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table and enable partition stats without configuring columns
to index
spark.sql(
s"""
- | insert into $tableName
- | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3',
3000, 30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
- | """.stripMargin
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price int,
+ | ts long
+ |) using hudi
+ |partitioned by (ts)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'price',
+ | hoodie.metadata.index.partition.stats.enable = 'true'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
)
- // Validate partition_stats index exists
- val metaClient = HoodieTableMetaClient.builder()
- .setBasePath(tablePath)
- .setConf(HoodieTestUtils.getDefaultStorageConf)
- .build()
- assertResult(tableName)(metaClient.getTableConfig.getTableName)
-
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
-
- spark.sql("set hoodie.metadata.enable=true")
- spark.sql("set hoodie.enable.data.skipping=true")
- spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
- checkAnswer(s"select id, name, price, ts from $tableName where
price>3000")(
- Seq(6, "a6", 4000, 30)
+ writeAndValidatePartitionStats(tableName, tablePath)
+ // validate partition stats index for id column
+ checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName')
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='id'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "id"), 1, 4),
+ Seq(getPartitionStatsIndexKey("ts=20", "id"), 2, 5),
+ Seq(getPartitionStatsIndexKey("ts=30", "id"), 3, 6)
)
-
- // Test price update, assert latest value and ensure file pruning
- spark.sql(s"update $tableName set price = price + 1 where id = 6")
- checkAnswer(s"select id, name, price, ts from $tableName where
price>3000")(
- Seq(6, "a6", 4001, 30)
+ // validate partition stats index for name column
+ checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName')
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='name'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "name"), "a1", "a4"),
+ Seq(getPartitionStatsIndexKey("ts=20", "name"), "a2", "a5"),
+ Seq(getPartitionStatsIndexKey("ts=30", "name"), "a3", "a6")
)
-
- verifyFilePruning(
- Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
HoodieMetadataConfig.ENABLE.key -> "true"),
- GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)),
- HoodieTableMetaClient.reload(metaClient),
- isDataSkippingExpected = true)
}
}
}
+ private def writeAndValidatePartitionStats(tableName: String, tablePath:
String): Unit = {
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3', 3000,
30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
+ | """.stripMargin
+ )
+
+ // Validate partition_stats index exists
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ assertResult(tableName)(metaClient.getTableConfig.getTableName)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ checkAnswer(s"select id, name, price, ts from $tableName where
price>3000")(
+ Seq(6, "a6", 4000, 30)
+ )
+
+ // Test price update, assert latest value and ensure file pruning
+ spark.sql(s"update $tableName set price = price + 1 where id = 6")
+ checkAnswer(s"select id, name, price, ts from $tableName where
price>3000")(
+ Seq(6, "a6", 4001, 30)
+ )
+
+ verifyFilePruning(
+ Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)),
+ HoodieTableMetaClient.reload(metaClient),
+ isDataSkippingExpected = true)
+
+ verifyFilePruning(
+ Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false",
HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("price", IntegerType)(), Literal(3000)),
+ HoodieTableMetaClient.reload(metaClient),
+ isDataSkippingExpected = false)
+ }
+
private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean,
isNoScanExpected: Boolean = false): Unit = {
// with data skipping
val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
@@ -310,6 +362,8 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
assertTrue(filteredFilesCount < latestDataFilesCount)
if (isNoScanExpected) {
assertTrue(filteredFilesCount == 0)
+ } else {
+ assertTrue(filteredFilesCount > 0)
}
} else {
assertTrue(filteredFilesCount == latestDataFilesCount)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
index e41d05138d5..1cef283eaa2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.dml
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
import org.apache.hudi.HoodieSparkUtils
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
+import org.apache.hudi.metadata.MetadataPartitionType
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
@@ -599,22 +600,22 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
)
val result2DF = spark.sql(
- s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where type=1"
+ s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.ALL_PARTITIONS.getRecordType}"
)
assert(result2DF.count() == 1)
val result3DF = spark.sql(
- s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where type=2"
+ s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.FILES.getRecordType}"
)
assert(result3DF.count() == 3)
val result4DF = spark.sql(
- s"select type, key, ColumnStatsMetadata from
hudi_metadata('$identifier') where type=3 or type=6"
+ s"select type, key, ColumnStatsMetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.COLUMN_STATS.getRecordType}"
)
- assert(result4DF.count() == 6)
+ assert(result4DF.count() == 3)
val result5DF = spark.sql(
- s"select type, key, recordIndexMetadata from
hudi_metadata('$identifier') where type=5"
+ s"select type, key, recordIndexMetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.RECORD_INDEX.getRecordType}"
)
assert(result5DF.count() == 3)
@@ -622,6 +623,12 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
s"select type, key, BloomFilterMetadata from
hudi_metadata('$identifier') where BloomFilterMetadata is not null"
)
assert(result6DF.count() == 0)
+
+ // no partition stats by default
+ val result7DF = spark.sql(
+ s"select type, key, ColumnStatsMetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}"
+ )
+ assert(result7DF.count() == 0)
}
}
}
@@ -664,25 +671,25 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
)
val result2DF = spark.sql(
- s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where type=1"
+ s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.ALL_PARTITIONS.getRecordType}"
)
assert(result2DF.count() == 1)
val result3DF = spark.sql(
- s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where type=2"
+ s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.FILES.getRecordType}"
)
assert(result3DF.count() == 3)
val result4DF = spark.sql(
- s"select * from hudi_metadata('$identifier') where type=3"
+ s"select * from hudi_metadata('$identifier') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}"
)
assert(result4DF.count() == 3)
- checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value
from hudi_metadata('$identifier') where type=3")(
+ checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value
from hudi_metadata('$identifier') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000)
)
- checkAnswer(s"select key, ColumnStatsMetadata.maxValue.member1.value
from hudi_metadata('$identifier') where type=3")(
+ checkAnswer(s"select key, ColumnStatsMetadata.maxValue.member1.value
from hudi_metadata('$identifier') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 2000),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 3000),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 4000)