This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 898eb1c98db [HUDI-8556] Add config for max number of colstats columns
(#12310)
898eb1c98db is described below
commit 898eb1c98db6740931de94c06c8377ad449dd69c
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Nov 29 02:49:44 2024 -0500
[HUDI-8556] Add config for max number of colstats columns (#12310)
- Adding config to control the number of columns to index with col stats.
---------
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: sivabalan <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 -
.../org/apache/hudi/io/HoodieAppendHandle.java | 22 +--
.../metadata/HoodieBackedTableMetadataWriter.java | 21 +--
.../hudi/common/config/HoodieMetadataConfig.java | 17 ++
.../hudi/metadata/HoodieTableMetadataUtil.java | 168 +++++++++++-------
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 2 +-
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 193 ++++++++++++++++++++-
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 17 +-
.../TestDataSkippingWithMORColstats.java | 4 +
.../column-stats-index-table-short-schema.json | 4 +
...ted2-column-stats-index-table-short-schema.json | 13 ++
...ted2-column-stats-index-table-short-schema.json | 13 ++
...ated-column-stats-index-table-short-schema.json | 8 +
.../hudi/functional/ColumnStatIndexTestBase.scala | 27 +--
.../hudi/functional/TestColumnStatsIndex.scala | 26 ++-
.../functional/TestColumnStatsIndexWithSQL.scala | 44 ++++-
.../apache/hudi/functional/TestMORDataSource.scala | 3 +-
.../hudi/dml/TestHoodieTableValuedFunction.scala | 6 +-
.../utilities/HoodieMetadataTableValidator.java | 3 +-
19 files changed, 448 insertions(+), 147 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0dfcc66833b..8af086c560d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2046,10 +2046,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return isMetadataTableEnabled() &&
getMetadataConfig().isColumnStatsIndexEnabled();
}
- public List<String> getColumnsEnabledForColumnStatsIndex() {
- return getMetadataConfig().getColumnsEnabledForColumnStatsIndex();
- }
-
public boolean isPartitionStatsIndexEnabled() {
return isMetadataTableEnabled() &&
getMetadataConfig().isPartitionStatsIndexEnabled();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 1891ec237a8..46dc9ad55bc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -57,8 +57,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.Lazy;
import org.apache.avro.Schema;
import org.slf4j.Logger;
@@ -432,19 +434,13 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
}
if (config.isMetadataColumnStatsIndexEnabled()) {
- final List<Schema.Field> fieldsToIndex;
- // If column stats index is enabled but columns not configured then we
assume that
- // all columns should be indexed
- if (config.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
- fieldsToIndex = writeSchemaWithMetaFields.getFields();
- } else {
- Set<String> columnsToIndexSet = new
HashSet<>(config.getColumnsEnabledForColumnStatsIndex());
-
- fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
- .filter(field -> columnsToIndexSet.contains(field.name()))
- .collect(Collectors.toList());
- }
-
+ Set<String> columnsToIndexSet = new HashSet<>(HoodieTableMetadataUtil
+ .getColumnsToIndex(hoodieTable.getMetaClient().getTableConfig(),
+ config.getMetadataConfig(),
Lazy.eagerly(Option.of(writeSchemaWithMetaFields)),
+ Option.of(this.recordMerger.getRecordType())));
+ final List<Schema.Field> fieldsToIndex =
writeSchemaWithMetaFields.getFields().stream()
+ .filter(field -> columnsToIndexSet.contains(field.name()))
+ .collect(Collectors.toList());
try {
Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
collectColumnRangeMetadata(recordList, fieldsToIndex,
stat.getPath(), writeSchemaWithMetaFields);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 71730c676d8..34ebdea906c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -512,8 +512,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private Pair<Integer, HoodieData<HoodieRecord>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
// during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
dataMetaClient, dataWriteConfig.getMetadataConfig(),
+ dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
@@ -1050,10 +1050,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient,
+ dataWriteConfig.getMetadataConfig(),
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
- dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getWritesFileIdEncoding(),
- dataWriteConfig.getMetadataConfig());
+ dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding());
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
@@ -1073,11 +1072,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
processAndCommit(instantTime, () -> {
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
- engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient,
- enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
- dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
- dataWriteConfig.getWritesFileIdEncoding(),
dataWriteConfig.getMetadataConfig());
+ engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient, dataWriteConfig.getMetadataConfig(),
+ enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding());
HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
records.union(additionalUpdates));
updateExpressionIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
@@ -1213,9 +1209,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
- cleanMetadata, instantTime, dataMetaClient, enabledPartitionTypes,
- dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex()));
+ cleanMetadata, instantTime, dataMetaClient,
dataWriteConfig.getMetadataConfig(), enabledPartitionTypes,
+ dataWriteConfig.getBloomIndexParallelism()));
closeInternal();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index ebe96d4d597..603b0837994 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -173,6 +173,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which column
stats index will be built. If not set, all columns will be indexed");
+ public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_MAX_COLUMNS =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.max.columns.to.index")
+ .defaultValue(32)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Maximum number of columns to generate column stats
for. If the config `" + COLUMN_STATS_INDEX_FOR_COLUMNS.key() + "` is set then
then this config will be ignored."
+ + "If unset, then column stats will be generated for the first n
columns in the table schema");
+
public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY =
"in-memory";
public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE =
"engine";
@@ -413,6 +421,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS),
CONFIG_VALUES_DELIMITER);
}
+ public Integer maxColumnsToIndexForColStats() {
+ return getIntOrDefault(COLUMN_STATS_INDEX_MAX_COLUMNS);
+ }
+
public String getColumnStatsIndexProcessingModeOverride() {
return getString(COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE);
}
@@ -595,6 +607,11 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
+ public Builder withMaxColumnsToIndexForColStats(int maxCols) {
+ metadataConfig.setValue(COLUMN_STATS_INDEX_MAX_COLUMNS,
String.valueOf(maxCols));
+ return this;
+ }
+
public Builder withBloomFilterIndexForColumns(String columns) {
metadataConfig.setValue(BLOOM_FILTER_INDEX_FOR_COLUMNS, columns);
return this;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 4899a68ee7f..12eaa6c5732 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -81,6 +81,7 @@ import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -153,6 +154,10 @@ import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_CO
import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
+import static
org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
+import static
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
@@ -245,7 +250,7 @@ public class HoodieTableMetadataUtil {
}
colStats.valueCount++;
- if (fieldValue != null && canCompare(fieldSchema,
record.getRecordType())) {
+ if (fieldValue != null && isColumnTypeSupported(fieldSchema,
Option.of(record.getRecordType()))) {
// Set the min value of the field
if (colStats.minValue == null
|| ConvertingGenericData.INSTANCE.compare(fieldValue,
colStats.minValue, fieldSchema) < 0) {
@@ -351,12 +356,10 @@ public class HoodieTableMetadataUtil {
* @param commitMetadata - Commit action metadata
* @param instantTime - Action instant time
* @param dataMetaClient - HoodieTableMetaClient for data
+ * @param metadataConfig - HoodieMetadataConfig
* @param enabledPartitionTypes - List of enabled MDT partitions
* @param bloomFilterType - Type of generated bloom filter
records
* @param bloomIndexParallelism - Parallelism for bloom filter
record generation
- * @param isColumnStatsIndexEnabled - Is column stats index enabled
- * @param columnStatsIndexParallelism - Parallelism for column stats
index records generation
- * @param targetColumnsForColumnStatsIndex - List of columns for column
stats index
* @return Map of partition to metadata records for the commit action
*/
public static Map<String, HoodieData<HoodieRecord>>
convertMetadataToRecords(HoodieEngineContext context,
@@ -364,14 +367,10 @@ public class HoodieTableMetadataUtil {
HoodieCommitMetadata commitMetadata,
String instantTime,
HoodieTableMetaClient dataMetaClient,
+
HoodieMetadataConfig metadataConfig,
List<MetadataPartitionType> enabledPartitionTypes,
String bloomFilterType,
-
int bloomIndexParallelism,
-
boolean isColumnStatsIndexEnabled,
-
int columnStatsIndexParallelism,
-
List<String> targetColumnsForColumnStatsIndex,
-
Integer writesFileIdEncoding,
-
HoodieMetadataConfig metadataConfig) {
+
int bloomIndexParallelism, Integer writesFileIdEncoding) {
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new
HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
context.parallelize(
convertMetadataToFilesPartitionRecords(commitMetadata, instantTime),
1);
@@ -385,7 +384,7 @@ public class HoodieTableMetadataUtil {
if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertMetadataToColumnStatsRecords(commitMetadata, context,
- dataMetaClient, isColumnStatsIndexEnabled,
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
+ dataMetaClient, metadataConfig);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
metadataColumnStatsRDD);
}
if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS))
{
@@ -562,16 +561,13 @@ public class HoodieTableMetadataUtil {
HoodieCleanMetadata cleanMetadata,
String instantTime,
HoodieTableMetaClient dataMetaClient,
+
HoodieMetadataConfig metadataConfig,
List<MetadataPartitionType> enabledPartitionTypes,
-
int bloomIndexParallelism,
-
boolean isColumnStatsIndexEnabled,
-
int columnStatsIndexParallelism,
-
List<String> targetColumnsForColumnStatsIndex) {
+
int bloomIndexParallelism) {
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new
HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
engineContext.parallelize(
convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(),
filesPartitionRecordsRDD);
-
if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext,
instantTime, bloomIndexParallelism);
@@ -581,11 +577,11 @@ public class HoodieTableMetadataUtil {
if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
- dataMetaClient, isColumnStatsIndexEnabled,
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
+ dataMetaClient, metadataConfig);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
metadataColumnStatsRDD);
}
if
(enabledPartitionTypes.contains(MetadataPartitionType.EXPRESSION_INDEX)) {
- convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata,
instantTime, dataMetaClient, bloomIndexParallelism,
columnStatsIndexParallelism, partitionToRecordsMap);
+ convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata,
instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism,
partitionToRecordsMap);
}
return partitionToRecordsMap;
@@ -593,7 +589,7 @@ public class HoodieTableMetadataUtil {
private static void
convertMetadataToExpressionIndexRecords(HoodieEngineContext engineContext,
HoodieCleanMetadata cleanMetadata,
String
instantTime, HoodieTableMetaClient dataMetaClient,
- int
bloomIndexParallelism, int columnStatsIndexParallelism,
+
HoodieMetadataConfig metadataConfig, int bloomIndexParallelism,
Map<String,
HoodieData<HoodieRecord>> partitionToRecordsMap) {
Option<HoodieIndexMetadata> indexMetadata =
dataMetaClient.getIndexMetadata();
if (indexMetadata.isPresent()) {
@@ -613,8 +609,12 @@ public class HoodieTableMetadataUtil {
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
partitionToRecordsMap.put(indexName,
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime,
bloomIndexParallelism));
} else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ HoodieMetadataConfig modifiedMetadataConfig =
HoodieMetadataConfig.newBuilder()
+ .withProperties(metadataConfig.getProps())
+ .withColumnStatsIndexForColumns(String.join(",",
indexDefinition.getSourceFields()))
+ .build();
partitionToRecordsMap.put(indexName,
- convertMetadataToColumnStatsRecords(cleanMetadata,
engineContext, dataMetaClient, true, columnStatsIndexParallelism,
indexDefinition.getSourceFields()));
+ convertMetadataToColumnStatsRecords(cleanMetadata,
engineContext, dataMetaClient, modifiedMetadataConfig));
} else {
throw new HoodieMetadataException("Unsupported expression index
type");
}
@@ -731,17 +731,13 @@ public class HoodieTableMetadataUtil {
* @param cleanMetadata - Clean action metadata
* @param engineContext - Engine context
* @param dataMetaClient - HoodieTableMetaClient for data
- * @param isColumnStatsIndexEnabled - Is column stats index enabled
- * @param columnStatsIndexParallelism - Parallelism for column stats
index records generation
- * @param targetColumnsForColumnStatsIndex - List of columns for column
stats index
+ * @param metadataConfig - HoodieMetadataConfig
* @return List of column stats index records for the clean metadata
*/
public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
HoodieEngineContext engineContext,
HoodieTableMetaClient dataMetaClient,
-
boolean isColumnStatsIndexEnabled,
-
int columnStatsIndexParallelism,
-
List<String> targetColumnsForColumnStatsIndex) {
+
HoodieMetadataConfig metadataConfig) {
List<Pair<String, String>> deleteFileList = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
// Files deleted from a partition
@@ -749,9 +745,8 @@ public class HoodieTableMetadataUtil {
deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition,
entry)));
});
- List<String> columnsToIndex =
- getColumnsToIndex(isColumnStatsIndexEnabled,
targetColumnsForColumnStatsIndex,
- Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
+ List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
+ Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
@@ -759,7 +754,7 @@ public class HoodieTableMetadataUtil {
return engineContext.emptyHoodieData();
}
- int parallelism = Math.max(Math.min(deleteFileList.size(),
columnStatsIndexParallelism), 1);
+ int parallelism = Math.max(Math.min(deleteFileList.size(),
metadataConfig.getColumnStatsIndexParallelism()), 1);
return engineContext.parallelize(deleteFileList, parallelism)
.flatMap(deleteFileInfoPair -> {
String partitionPath = deleteFileInfoPair.getLeft();
@@ -1117,14 +1112,12 @@ public class HoodieTableMetadataUtil {
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
HoodieTableMetaClient dataMetaClient,
-
boolean isColumnStatsIndexEnabled,
+
HoodieMetadataConfig metadataConfig,
int
columnStatsIndexParallelism,
-
List<String> targetColumnsForColumnStatsIndex,
int
maxReaderBufferSize) {
// Find the columns to index
- final List<String> columnsToIndex =
- getColumnsToIndex(isColumnStatsIndexEnabled,
targetColumnsForColumnStatsIndex,
- Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
+ final List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(),
+ metadataConfig, Lazy.lazily(() ->
tryResolveSchemaForTable(dataMetaClient)));
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
LOG.warn("No columns to index for column stats index.");
@@ -1312,9 +1305,7 @@ public class HoodieTableMetadataUtil {
public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
HoodieEngineContext engineContext,
HoodieTableMetaClient dataMetaClient,
-
boolean isColumnStatsIndexEnabled,
-
int columnStatsIndexParallelism,
-
List<String> targetColumnsForColumnStatsIndex) {
+
HoodieMetadataConfig metadataConfig) {
List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());
@@ -1336,7 +1327,7 @@ public class HoodieTableMetadataUtil {
Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) :
schema);
- List<String> columnsToIndex =
getColumnsToIndex(isColumnStatsIndexEnabled, targetColumnsForColumnStatsIndex,
+ List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
Lazy.eagerly(tableSchema));
if (columnsToIndex.isEmpty()) {
@@ -1344,7 +1335,7 @@ public class HoodieTableMetadataUtil {
return engineContext.emptyHoodieData();
}
- int parallelism = Math.max(Math.min(allWriteStats.size(),
columnStatsIndexParallelism), 1);
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getColumnStatsIndexParallelism()), 1);
return engineContext.parallelize(allWriteStats, parallelism)
.flatMap(writeStat ->
translateWriteStatToColumnStats(writeStat, dataMetaClient,
columnsToIndex).iterator());
@@ -1353,25 +1344,74 @@ public class HoodieTableMetadataUtil {
}
}
+ @VisibleForTesting
+ static final String[] META_COLS_TO_ALWAYS_INDEX =
{COMMIT_TIME_METADATA_FIELD, RECORD_KEY_METADATA_FIELD,
PARTITION_PATH_METADATA_FIELD};
+ @VisibleForTesting
+ public static final Set<String> META_COL_SET_TO_INDEX = new
HashSet<>(Arrays.asList(META_COLS_TO_ALWAYS_INDEX));
+
+ public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ HoodieMetadataConfig
metadataConfig,
+ List<String> columnNames) {
+ return getColumnsToIndex(tableConfig, metadataConfig,
Either.left(columnNames), Option.empty());
+ }
+
+ public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ HoodieMetadataConfig
metadataConfig,
+ Lazy<Option<Schema>>
tableSchema,
+ Option<HoodieRecordType>
recordType) {
+ return getColumnsToIndex(tableConfig, metadataConfig,
Either.right(tableSchema), recordType);
+ }
+
+ public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ HoodieMetadataConfig
metadataConfig,
+ Lazy<Option<Schema>>
tableSchema) {
+ return getColumnsToIndex(tableConfig, metadataConfig,
Either.right(tableSchema), Option.empty());
+ }
+
+ private static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ HoodieMetadataConfig
metadataConfig,
+ Either<List<String>,
Lazy<Option<Schema>>> tableSchema,
+ Option<HoodieRecordType>
recordType) {
+ Stream<String> columnsToIndexWithoutRequiredMetas =
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchema,
recordType);
+ if (!tableConfig.populateMetaFields()) {
+ return columnsToIndexWithoutRequiredMetas.collect(Collectors.toList());
+ }
+
+ return Stream.concat(Arrays.stream(META_COLS_TO_ALWAYS_INDEX),
columnsToIndexWithoutRequiredMetas).collect(Collectors.toList());
+ }
+
/**
- * Get the list of columns for the table for column stats indexing
+ * Get list of columns that should be indexed for col stats or partition
stats
+ * We always index META_COLS_TO_ALWAYS_INDEX If
metadataConfig.getColumnsEnabledForColumnStatsIndex()
+ * is empty, we will use metadataConfig.maxColumnsToIndexForColStats() and
index the first n columns in the table in addition to the
+ * required meta cols
+ *
+ * @param metadataConfig metadata config
+ * @param tableSchema either a list of the columns in the table, or
a lazy option of the table schema
+ * @param recordType Option of record type. Used to determine
which types are valid to index
+ * @return list of columns that should be indexed
*/
- private static List<String> getColumnsToIndex(boolean
isColumnStatsIndexEnabled,
- List<String>
targetColumnsForColumnStatsIndex,
- Lazy<Option<Schema>>
lazyWriterSchemaOpt) {
- checkState(isColumnStatsIndexEnabled);
-
- if (!targetColumnsForColumnStatsIndex.isEmpty()) {
- return targetColumnsForColumnStatsIndex;
+ private static Stream<String>
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
+
Either<List<String>, Lazy<Option<Schema>>> tableSchema,
+
Option<HoodieRecordType> recordType) {
+ List<String> columnsToIndex =
metadataConfig.getColumnsEnabledForColumnStatsIndex();
+ if (!columnsToIndex.isEmpty()) {
+ return columnsToIndex.stream().filter(fieldName ->
!META_COL_SET_TO_INDEX.contains(fieldName));
+ }
+ if (tableSchema.isLeft()) {
+ return getFirstNFieldNames(tableSchema.asLeft().stream(),
metadataConfig.maxColumnsToIndexForColStats());
+ } else {
+ return tableSchema.asRight().get().map(schema ->
getFirstNFieldNames(schema, metadataConfig.maxColumnsToIndexForColStats(),
recordType)).orElse(Stream.empty());
}
+ }
- Option<Schema> writerSchemaOpt = lazyWriterSchemaOpt.get();
- return writerSchemaOpt
- .map(writerSchema ->
- writerSchema.getFields().stream()
- .map(Schema.Field::name)
- .collect(Collectors.toList()))
- .orElse(Collections.emptyList());
+ private static Stream<String> getFirstNFieldNames(Schema tableSchema, int n,
Option<HoodieRecordType> recordType) {
+ return getFirstNFieldNames(tableSchema.getFields().stream()
+ .filter(field -> isColumnTypeSupported(field.schema(),
recordType)).map(Schema.Field::name), n);
+ }
+
+ private static Stream<String> getFirstNFieldNames(Stream<String> fieldNames,
int n) {
+ return fieldNames.filter(fieldName ->
!HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName)).limit(n);
}
private static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
@@ -1619,12 +1659,13 @@ public class HoodieTableMetadataUtil {
}
}
- private static boolean canCompare(Schema schema, HoodieRecordType
recordType) {
- // if recordType is SPARK then we cannot compare RECORD and ARRAY types in
addition to MAP type
- if (recordType == HoodieRecordType.SPARK) {
- return schema.getType() != Schema.Type.RECORD && schema.getType() !=
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
+ private static boolean isColumnTypeSupported(Schema schema,
Option<HoodieRecordType> recordType) {
+ // if record type is set and if its AVRO, MAP is unsupported.
+ if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
+ return schema.getType() != Schema.Type.MAP;
}
- return schema.getType() != Schema.Type.MAP;
+ // if record Type is not set or if recordType is SPARK then we cannot
compare RECORD and ARRAY types in addition to MAP type
+ return schema.getType() != Schema.Type.RECORD && schema.getType() !=
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
}
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig
tableConfig) {
@@ -2381,10 +2422,7 @@ public class HoodieTableMetadataUtil {
HoodieTableMetaClient dataTableMetaClient,
Option<Schema> writerSchemaOpt) {
Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ?
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() ->
tryResolveSchemaForTable(dataTableMetaClient));
- final List<String> columnsToIndex = getColumnsToIndex(
- metadataConfig.isPartitionStatsIndexEnabled(),
- metadataConfig.getColumnsEnabledForColumnStatsIndex(),
- lazyWriterSchemaOpt);
+ final List<String> columnsToIndex =
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig,
lazyWriterSchemaOpt);
if (columnsToIndex.isEmpty()) {
LOG.warn("No columns to index for partition stats index");
return engineContext.emptyHoodieData();
@@ -2456,7 +2494,7 @@ public class HoodieTableMetadataUtil {
HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
- List<String> columnsToIndex =
getColumnsToIndex(metadataConfig.isPartitionStatsIndexEnabled(),
metadataConfig.getColumnsEnabledForColumnStatsIndex(), writerSchemaOpt);
+ List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
writerSchemaOpt);
if (columnsToIndex.isEmpty()) {
return engineContext.emptyHoodieData();
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 776fd6bbf0b..662600f64e1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -155,7 +155,7 @@ public class TestHoodieAvroUtils {
+
"{\"name\":\"ss\",\"type\":{\"name\":\"ss\",\"type\":\"record\",\"fields\":["
+ "{\"name\":\"fn\",\"type\":[\"null\" ,\"string\"],\"default\":
null},{\"name\":\"ln\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";
- private static String SCHEMA_WITH_AVRO_TYPES =
"{\"name\":\"TestRecordAvroTypes\",\"type\":\"record\",\"fields\":["
+ public static final String SCHEMA_WITH_AVRO_TYPES =
"{\"name\":\"TestRecordAvroTypes\",\"type\":\"record\",\"fields\":["
// Primitive types
+ "{\"name\":\"booleanField\",\"type\":\"boolean\"},"
+ "{\"name\":\"intField\",\"type\":\"int\"},"
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 30c427ae28e..daf89e47c72 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -19,6 +19,7 @@
package org.apache.hudi.metadata;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
@@ -41,6 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.Lazy;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -60,6 +62,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.hudi.avro.TestHoodieAvroUtils.SCHEMA_WITH_AVRO_TYPES;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileIDForFileGroup;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryIndex;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -270,13 +273,17 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
metaClient,
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
// Validate the result.
- validatePartitionStats(result, instant1, instant2);
+ validatePartitionStats(result, instant1, instant2, 6);
}
private static void validatePartitionStats(HoodieData<HoodieRecord> result,
String instant1, String instant2) {
+ validatePartitionStats(result, instant1, instant2, 15);
+ }
+
+ private static void validatePartitionStats(HoodieData<HoodieRecord> result,
String instant1, String instant2, int expectedTotalRecords) {
List<HoodieRecord> records = result.collectAsList();
- // 3 partitions * 2 columns = 6 partition stats records
- assertEquals(6, records.size());
+ // 3 partitions * (2 + 3) columns = 15 partition stats records. 3 meta
fields are indexed by default.
+ assertEquals(expectedTotalRecords, records.size());
assertEquals(MetadataPartitionType.PARTITION_STATS.getPartitionPath(),
records.get(0).getPartitionPath());
((HoodieMetadataPayload)
result.collectAsList().get(0).getData()).getColumnStatMetadata().get().getColumnName();
records.forEach(r -> {
@@ -355,4 +362,184 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
// Test for complex fields
assertFalse(validateDataTypeForSecondaryIndex(Arrays.asList("arrayField",
"mapField", "structField"), schema));
}
+
+ @Test
+ public void testGetColumnsToIndex() {
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+
+ //test default
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .build();
+ List<String> colNames = new ArrayList<>();
+ addNColumns(colNames,
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
+ List<String> expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ addNColumns(expected,
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue());
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with table schema < default
+ int tableSchemaSize =
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() - 10;
+ assertTrue(tableSchemaSize > 0);
+ colNames = new ArrayList<>();
+ addNColumns(colNames, tableSchemaSize);
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ addNColumns(expected, tableSchemaSize);
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with max val < tableSchema
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .withMaxColumnsToIndexForColStats(3)
+ .build();
+ colNames = new ArrayList<>();
+ addNColumns(colNames,
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ addNColumns(expected, 3);
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with max val > tableSchema
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+
.withMaxColumnsToIndexForColStats(HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue()
+ 10)
+ .build();
+ colNames = new ArrayList<>();
+ addNColumns(colNames, tableSchemaSize);
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ addNColumns(expected, tableSchemaSize);
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with list of cols
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .withColumnStatsIndexForColumns("col_1,col_7,col_11")
+ .build();
+ colNames = new ArrayList<>();
+ addNColumns(colNames, 15);
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ expected.add("col_1");
+ expected.add("col_7");
+ expected.add("col_11");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with list of cols longer than config
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .withMaxColumnsToIndexForColStats(1)
+ .withColumnStatsIndexForColumns("col_1,col_7,col_11")
+ .build();
+ colNames = new ArrayList<>();
+ addNColumns(colNames, 15);
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ expected.add("col_1");
+ expected.add("col_7");
+ expected.add("col_11");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with list of cols including meta cols than config
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+
.withColumnStatsIndexForColumns("col_1,col_7,_hoodie_commit_time,col_11,_hoodie_commit_seqno")
+ .build();
+ colNames = new ArrayList<>();
+ addNColumns(colNames, 15);
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ expected.add("col_1");
+ expected.add("col_7");
+ expected.add("col_11");
+ expected.add("_hoodie_commit_seqno");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with avro schema
+ Schema schema = new Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES);
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+
.withColumnStatsIndexForColumns("booleanField,decimalField,localTimestampMillisField")
+ .build();
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ expected.add("booleanField");
+ expected.add("decimalField");
+ expected.add("localTimestampMillisField");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema))));
+
+ //test with avro schema with max cols set
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .withMaxColumnsToIndexForColStats(2)
+ .build();
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ expected.add("booleanField");
+ expected.add("intField");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema))));
+ //test with avro schema with meta cols
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema)))));
+
+ //test with avro schema with type filter
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .withMaxColumnsToIndexForColStats(100)
+ .build();
+ expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
+ expected.add("timestamp");
+ expected.add("_row_key");
+ expected.add("partition_path");
+ expected.add("trip_type");
+ expected.add("rider");
+ expected.add("driver");
+ expected.add("begin_lat");
+ expected.add("begin_lon");
+ expected.add("end_lat");
+ expected.add("end_lon");
+ expected.add("distance_in_meters");
+ expected.add("seconds_since_epoch");
+ expected.add("weight");
+ expected.add("nation");
+ expected.add("current_date");
+ expected.add("current_ts");
+ expected.add("height");
+ expected.add("_hoodie_is_deleted");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA))));
+ //test with avro schema with meta cols
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA)))));
+
+ //test with meta cols disabled
+ tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(),
"false");
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .build();
+ colNames = new ArrayList<>();
+ addNColumns(colNames, tableSchemaSize);
+ expected = new ArrayList<>();
+ addNColumns(expected, tableSchemaSize);
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with meta cols disabled with col list
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+ .withColumnStatsIndexForColumns("col_1,col_7,col_11")
+ .build();
+ colNames = new ArrayList<>();
+ addNColumns(colNames, 15);
+ expected = new ArrayList<>();
+ expected.add("col_1");
+ expected.add("col_7");
+ expected.add("col_11");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
colNames));
+
+ //test with meta cols disabled with avro schema
+ metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true).withMetadataIndexColumnStats(true)
+
.withColumnStatsIndexForColumns("booleanField,decimalField,localTimestampMillisField")
+ .build();
+ expected = new ArrayList<>();
+ expected.add("booleanField");
+ expected.add("decimalField");
+ expected.add("localTimestampMillisField");
+ assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema))));
+ }
+
+ private void addNColumns(List<String> list, int n) {
+ for (int i = 0; i < n; i++) {
+ list.add("col_" + i);
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 7c822ef2593..3eaa59d6b80 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -33,6 +33,8 @@ import org.apache.hudi.common.util.hash.{ColumnIndexID,
PartitionIndexID}
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.util.JFunction
+import org.apache.hudi.util.JavaScalaConverters.convertScalaListToJavaList
+
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.generic.GenericData
import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD, createDataFrameFromRows}
@@ -45,6 +47,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.nio.ByteBuffer
+
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
import scala.collection.mutable.ListBuffer
@@ -63,17 +66,9 @@ class ColumnStatsIndexSupport(spark: SparkSession,
// on to the executor
protected val inMemoryProjectionThreshold =
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
- private lazy val indexedColumns: Set[String] = {
- val customIndexedColumns =
metadataConfig.getColumnsEnabledForColumnStatsIndex
- // Column Stats Index could index either
- // - The whole table
- // - Only configured columns
- if (customIndexedColumns.isEmpty) {
- tableSchema.fieldNames.toSet
- } else {
- customIndexedColumns.asScala.toSet
- }
- }
+ private lazy val indexedColumns: Set[String] = HoodieTableMetadataUtil
+ .getColumnsToIndex(metaClient.getTableConfig, metadataConfig,
convertScalaListToJavaList(tableSchema.fieldNames)).asScala.toSet
+
override def getIndexName: String = ColumnStatsIndexSupport.INDEX_NAME
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
index e22a91c525f..e5fe0a3ad00 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java
@@ -114,6 +114,10 @@ public class TestDataSkippingWithMORColstats extends
HoodieSparkClientTestBase {
*/
@Test
public void testBaseFileOnly() {
+ // note that this config is here just to test that it does nothing since
+ // we have specified which columns to index
+ options.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.key(),
"1");
+
options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
"false");
Dataset<Row> inserts = makeInsertDf("000", 100);
Dataset<Row> batch1 = inserts.where(matchCond);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table-short-schema.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..446c9df217e
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table-short-schema.json
@@ -0,0 +1,4 @@
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":"
769sdc","c2_minValue":"
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
932sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,
"valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":"
943sdc","c2_minValue":"
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,
"valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,
"valueCount":13}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-updated2-column-stats-index-table-short-schema.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-updated2-column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..a421adfb5fc
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-updated2-column-stats-index-table-short-schema.json
@@ -0,0 +1,13 @@
+{"c1_maxValue":101,"c1_minValue":101,"c1_nullCount":0,"c2_maxValue":"
999sdc","c2_minValue":"
999sdc","c2_nullCount":0,"c3_maxValue":10.329,"c3_minValue":10.329,"c3_nullCount":0,"valueCount":1}
+{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_maxValue":"
8sdc","c2_minValue":"
111sdc","c2_nullCount":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_nullCount":0,"valueCount":15}
+{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_maxValue":"
76sdc","c2_minValue":"
224sdc","c2_nullCount":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_nullCount":0,"valueCount":12}
+{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_maxValue":"
768sdc","c2_minValue":"
118sdc","c2_nullCount":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_nullCount":0,"valueCount":7}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":"
769sdc","c2_minValue":"
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":"
985sdc","c2_minValue":"
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_maxValue":"
770sdc","c2_minValue":"
129sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":6}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
932sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
987sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":"
943sdc","c2_minValue":"
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":"
984sdc","c2_minValue":"
200sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":1,"valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":"
989sdc","c2_minValue":"
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-updated2-column-stats-index-table-short-schema.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-updated2-column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..382ba58abf0
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-updated2-column-stats-index-table-short-schema.json
@@ -0,0 +1,13 @@
+{"c1_maxValue":101,"c1_minValue":101,"c1_nullCount":0,"c2_maxValue":"
999sdc","c2_minValue":"
999sdc","c2_nullCount":0,"c3_maxValue":10.329,"c3_minValue":10.329,"c3_nullCount":0,"valueCount":1}
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
984sdc","c2_minValue":"
980sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":1,"valueCount":4}
+{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_maxValue":"
8sdc","c2_minValue":"
111sdc","c2_nullCount":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_nullCount":0,"valueCount":15}
+{"c1_maxValue":619,"c1_minValue":619,"c1_nullCount":0,"c2_maxValue":"
985sdc","c2_minValue":"
985sdc","c2_nullCount":0,"c3_maxValue":230.320,"c3_minValue":230.320,"c3_nullCount":0,"valueCount":1}
+{"c1_maxValue":633,"c1_minValue":624,"c1_nullCount":0,"c2_maxValue":"
987sdc","c2_minValue":"
986sdc","c2_nullCount":0,"c3_maxValue":580.317,"c3_minValue":375.308,"c3_nullCount":0,"valueCount":2}
+{"c1_maxValue":639,"c1_minValue":555,"c1_nullCount":0,"c2_maxValue":"
989sdc","c2_minValue":"
982sdc","c2_nullCount":0,"c3_maxValue":904.304,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":3}
+{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_maxValue":"
76sdc","c2_minValue":"
224sdc","c2_nullCount":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_nullCount":0,"valueCount":12}
+{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_maxValue":"
768sdc","c2_minValue":"
118sdc","c2_nullCount":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_nullCount":0,"valueCount":7}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":"
769sdc","c2_minValue":"
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_maxValue":"
770sdc","c2_minValue":"
129sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":6}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
932sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":"
943sdc","c2_minValue":"
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table-short-schema.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table-short-schema.json
new file mode 100644
index 00000000000..bae2a86a04a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table-short-schema.json
@@ -0,0 +1,8 @@
+{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_maxValue":"
8sdc","c2_minValue":"
111sdc","c2_nullCount":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_nullCount":0,"valueCount":15}
+{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_maxValue":"
76sdc","c2_minValue":"
224sdc","c2_nullCount":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_nullCount":0,"valueCount":12}
+{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_maxValue":"
768sdc","c2_minValue":"
118sdc","c2_nullCount":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_nullCount":0,"valueCount":7}
+{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":"
769sdc","c2_minValue":"
309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
+{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_maxValue":"
770sdc","c2_minValue":"
129sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_nullCount":0,"valueCount":6}
+{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
932sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
+{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":"
943sdc","c2_minValue":"
200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
+{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index b0e23ac1902..582054cb3e6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -33,10 +33,14 @@ import org.apache.hudi.storage.StoragePath
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
+
import org.apache.spark.sql._
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.hudi.testutils.{HoodieSparkClientTestBase,
LogFileColStatsTestUtil}
+import org.apache.hudi.util.JavaScalaConverters.convertScalaListToJavaList
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
+
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
@@ -49,6 +53,7 @@ import java.sql.{Date, Timestamp}
import java.util
import java.util.List
import java.util.stream.Collectors
+
import scala.collection.JavaConverters._
import scala.util.Random
@@ -115,8 +120,8 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
// Currently, routine manually validating the column stats (by actually
reading every column of every file)
// only supports parquet files. Therefore we skip such validation when
delta-log files are present, and only
// validate in following cases: (1) COW: all operations; (2) MOR: insert
only.
- val shouldValidateColumnStatsManually = params.testCase.tableType ==
HoodieTableType.COPY_ON_WRITE ||
-
params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ val shouldValidateColumnStatsManually = (params.testCase.tableType ==
HoodieTableType.COPY_ON_WRITE ||
+
params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)) &&
params.shouldValidateManually
validateColumnStatsIndex(
params.testCase, params.metadataOpts,
params.expectedColStatsSourcePath, shouldValidateColumnStatsManually,
params.latestCompletedCommit)
@@ -228,17 +233,16 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
val columnStatsIndex = new ColumnStatsIndexSupport(spark,
sourceTableSchema, metadataConfig, metaClient)
- val indexedColumns: Set[String] = {
- val customIndexedColumns =
metadataConfig.getColumnsEnabledForColumnStatsIndex
- if (customIndexedColumns.isEmpty) {
- sourceTableSchema.fieldNames.toSet
- } else {
- customIndexedColumns.asScala.toSet
- }
- }
+ val indexedColumns: Set[String] = HoodieTableMetadataUtil
+ .getColumnsToIndex(metaClient.getTableConfig, metadataConfig,
convertScalaListToJavaList(sourceTableSchema.fieldNames)).asScala.toSet
+
val (expectedColStatsSchema, _) =
composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns,
sourceTableSchema)
- val validationSortColumns = Seq("c1_maxValue", "c1_minValue",
"c2_maxValue", "c2_minValue", "c3_maxValue",
+ val validationSortColumns = if (indexedColumns.contains("c5")) {
+ Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue",
"c3_maxValue",
"c3_minValue", "c5_maxValue", "c5_minValue")
+ } else {
+ Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue",
"c3_maxValue", "c3_minValue")
+ }
columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames,
testCase.shouldReadInMemory) { transposedColStatsDF =>
// Match against expected column stats table
@@ -363,6 +367,7 @@ object ColumnStatIndexTestBase {
operation: String,
saveMode: SaveMode,
shouldValidate: Boolean = true,
+ shouldValidateManually: Boolean = true,
latestCompletedCommit: String = null,
numPartitions: Integer = 4,
parquetMaxFileSize: Integer = 10 * 1024,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index a78e31a08d2..63d41b60a06 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -197,8 +197,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- true,
- latestCompletedCommit,
+ latestCompletedCommit = latestCompletedCommit,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -215,8 +214,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = expectedColStatsSourcePath1,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- true,
- latestCompletedCommit,
+ latestCompletedCommit = latestCompletedCommit,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -248,7 +246,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = null,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite,
- false,
+ shouldValidate = false,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -259,7 +257,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = null,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- false,
+ shouldValidate = false,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -289,8 +287,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- true,
- latestCompletedCommit,
+ latestCompletedCommit = latestCompletedCommit,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -381,7 +378,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = null,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- false,
+ shouldValidate = false,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -394,7 +391,6 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- true,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -427,7 +423,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = null,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite,
- false,
+ shouldValidate = false,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -443,7 +439,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = null,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- false,
+ shouldValidate = false,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -460,7 +456,6 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- true,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -494,7 +489,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = null,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite,
- false,
+ shouldValidate = false,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -510,7 +505,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = null,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- false,
+ shouldValidate = false,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
@@ -527,7 +522,6 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- true,
numPartitions = 1,
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 78e04b2f417..59558796888 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -67,6 +67,29 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
verifyFileIndexAndSQLQueries(commonOpts)
}
+ @ParameterizedTest
+ @MethodSource(Array("testMetadataColumnStatsIndexParams"))
+ def testMetadataColumnStatsIndexWithSQLWithLimitedIndexes(testCase:
ColumnStatsTestCase): Unit = {
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.key() -> "3"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+ DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+ ) ++ metadataOpts
+ setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true,
useShortSchema = true)
+ }
+
@ParameterizedTest
@MethodSource(Array("testMetadataColumnStatsIndexParamsForMOR"))
def testMetadataColumnStatsIndexSQLWithInMemoryIndex(testCase:
ColumnStatsTestCase): Unit = {
@@ -204,25 +227,33 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
verifyFileIndexAndSQLQueries(commonOpts)
}
- private def setupTable(testCase: ColumnStatsTestCase, metadataOpts:
Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean):
Unit = {
+ private def setupTable(testCase: ColumnStatsTestCase, metadataOpts:
Map[String, String], commonOpts: Map[String, String],
+ shouldValidate: Boolean, useShortSchema: Boolean =
false): Unit = {
+ val filePostfix = if (useShortSchema) {
+ "-short-schema"
+ } else {
+ ""
+ }
doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/input-table-json",
- expectedColStatsSourcePath =
"index/colstats/column-stats-index-table.json",
+ expectedColStatsSourcePath =
s"index/colstats/column-stats-index-table${filePostfix}.json",
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ shouldValidateManually = !useShortSchema,
saveMode = SaveMode.Overwrite))
doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/another-input-table-json",
- expectedColStatsSourcePath =
"index/colstats/updated-column-stats-index-table.json",
+ expectedColStatsSourcePath =
s"index/colstats/updated-column-stats-index-table${filePostfix}.json",
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ shouldValidateManually = !useShortSchema,
saveMode = SaveMode.Append))
// NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
// deferred updates), diverging from COW
val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
- "index/colstats/cow-updated2-column-stats-index-table.json"
+
s"index/colstats/cow-updated2-column-stats-index-table${filePostfix}.json"
} else {
- "index/colstats/mor-updated2-column-stats-index-table.json"
+
s"index/colstats/mor-updated2-column-stats-index-table${filePostfix}.json"
}
doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
@@ -230,7 +261,8 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- shouldValidate))
+ shouldValidate,
+ shouldValidateManually = !useShortSchema))
}
def verifyFileIndexAndSQLQueries(opts: Map[String, String],
isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean
= true): Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 9e3c09edb76..1fc1b7e7777 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -893,7 +893,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
def testReadPathsForOnlyLogFiles(recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
// enable column stats
- val hudiOpts = writeOpts ++
Map(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true")
+ val hudiOpts = writeOpts ++
Map(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key() ->
"fare,city_to_state,rider")
initMetaClient(HoodieTableType.MERGE_ON_READ)
val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
index f6873c8bd3d..6e214e3fe35 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
@@ -616,7 +616,8 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
val result4DF = spark.sql(
s"select type, key, ColumnStatsMetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.COLUMN_STATS.getRecordType}"
)
- assert(result4DF.count() == 3)
+ // 3 meta columns are always indexed so 3 stats per column * (3 meta
cols + 1 data col) = 12
+ assert(result4DF.count() == 12)
val result5DF = spark.sql(
s"select type, key, recordIndexMetadata from
hudi_metadata('$identifier') where
type=${MetadataPartitionType.RECORD_INDEX.getRecordType}"
@@ -661,7 +662,8 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
| preCombineField = 'ts',
| hoodie.datasource.write.recordkey.field = 'id',
| hoodie.metadata.index.partition.stats.enable = 'true',
- | hoodie.metadata.index.column.stats.column.list = 'price'
+ | hoodie.metadata.index.column.stats.column.list = 'price',
+ | hoodie.populate.meta.fields = 'false'
|)
|location '${tmp.getCanonicalPath}/$tableName'
|""".stripMargin
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 63079544886..8a8bf31a8ee 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -135,6 +135,7 @@ import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.META_COL_SET_TO_INDEX;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLocationFromRecordIndexInfo;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLogFileColumnRangeMetadata;
@@ -1823,7 +1824,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
private List<String> getAllColumnNames() {
try {
- return schema.getFields().stream()
+ return schema.getFields().stream().filter(field ->
META_COL_SET_TO_INDEX.contains(field.name()))
.map(Schema.Field::name).collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieException("Failed to get all column names for " +
metaClient.getBasePath());