This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b4e1e5eba1c [HUDI-8208] Fix partition stats bound when compacting or
clustering (#12050)
b4e1e5eba1c is described below
commit b4e1e5eba1c7a5b03dff7d4bad5878901b79e3b0
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Oct 15 02:43:19 2024 +0530
[HUDI-8208] Fix partition stats bound when compacting or clustering (#12050)
The [min, max] range in column stats or partition stats can keep widening
with udpates or deletes, because we simply take min of all mins' and max of
maxs' while merging the stats. This can lead to a degenerative case where all
partitions qualify for a predicate based on stats, even though actually very
few partitions meet the predicate based on actual data. It defeats the purpose
of pruning/skipping using stats. To fix this problem, we need to bring the
range to a tighter bound. In o [...]
- Adds a flag in column stats metadata payload - isTightBound - to indicate
whether min/max range is a tighter bound based on latest snapshot or not. It is
false by default and set to true during compaction or clustering.
- Adds a config to disable calculating tight bounds. Enabled by default for
compaction and clustering.
- To calculate tight bound, we look at the colstats partition for the
uncompacted or unclustered files and then merge the colstats with that of the
compacted or clustered files. Most of the changes are in
HoodieTableMetadataUtil.
---------
Co-authored-by: sivabalan <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 18 ++-
hudi-common/src/main/avro/HoodieMetadata.avsc | 11 ++
.../hudi/common/config/HoodieMetadataConfig.java | 18 +++
.../common/model/HoodieColumnRangeMetadata.java | 18 +++
.../hudi/common/model/WriteOperationType.java | 4 +
.../hudi/metadata/HoodieBackedTableMetadata.java | 2 +-
.../hudi/metadata/HoodieMetadataPayload.java | 3 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 162 ++++++++++++++++-----
.../apache/hudi/utils/TestAvroSchemaConverter.java | 3 +-
.../common/functional/TestHoodieLogFormat.java | 86 +----------
.../common/testutils/HoodieCommonTestHarness.java | 126 ++++++++++++++++
.../hudi/metadata/TestHoodieMetadataPayload.java | 12 +-
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 110 +++++++++++---
.../TestPartitionStatsIndexWithSql.scala | 79 ++++++----
.../utilities/HoodieMetadataTableValidator.java | 5 +-
15 files changed, 468 insertions(+), 189 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f72e47ad98f..cc101ed6ade 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -490,7 +490,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
private Pair<Integer, HoodieData<HoodieRecord>>
initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) {
- HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext,
partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient);
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext,
partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient,
+ Option.of(new
Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}
@@ -590,13 +591,14 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
private List<Pair<String, FileSlice>> getPartitionFileSlicePairs() throws
IOException {
- HoodieMetadataFileSystemView fsView = getMetadataView();
- // Collect the list of latest file slices present in each partition
- List<String> partitions = metadata.getAllPartitionPaths();
- fsView.loadAllPartitions();
- List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
- partitions.forEach(partition ->
fsView.getLatestFileSlices(partition).forEach(fs ->
partitionFileSlicePairs.add(Pair.of(partition, fs))));
- return partitionFileSlicePairs;
+ try (HoodieMetadataFileSystemView fsView = getMetadataView()) {
+ // Collect the list of latest file slices present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ fsView.loadAllPartitions();
+ List<Pair<String, FileSlice>> partitionFileSlicePairs = new
ArrayList<>();
+ partitions.forEach(partition ->
fsView.getLatestFileSlices(partition).forEach(fs ->
partitionFileSlicePairs.add(Pair.of(partition, fs))));
+ return partitionFileSlicePairs;
+ }
}
private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc
b/hudi-common/src/main/avro/HoodieMetadata.avsc
index e7a5a1e145d..f092c3b2a63 100644
--- a/hudi-common/src/main/avro/HoodieMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieMetadata.avsc
@@ -353,6 +353,17 @@
"doc": "Column range entry valid/deleted flag",
"name": "isDeleted",
"type": "boolean"
+ },
+ // NOTE: This is a new field added in 1.0.0.
+ // Typically, the min/max range for each column
can become wider i.e. the minValue <= all
+ // valid values and the maxValue >= all valid
values in the file with updates and deletes.
+ // For effective pruning, the min/max range can
be updated to a tighter bound during
+ // compaction or clustering.
+ {
+ "doc": "Whether the min/max range of a column is
tight bound or not",
+ "name": "isTightBound",
+ "type": "boolean",
+ "default": false
}
]
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 736b21e847a..c84723db8ed 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -358,6 +358,15 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating partition stats
index.");
+ public static final ConfigProperty<Boolean>
ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND = ConfigProperty
+ .key(METADATA_PREFIX + ".index.partition.stats.tightBound.enable")
+ .defaultValue(true)
+ .sinceVersion("1.0.0")
+ .withDocumentation("Enable tight bound for the min/max value for each
column at the storage partition level. "
+ + "Typically, the min/max range for each column can become wider
(i.e. the minValue is <= all valid values "
+ + "in the file, and the maxValue >= all valid values in the file)
with updates and deletes. If this config is enabled, "
+ + "the min/max range will be updated to the tight bound of the valid
values in the latest snapshot of the partition.");
+
public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP =
ConfigProperty
.key(METADATA_PREFIX + ".index.secondary.enable")
.defaultValue(false)
@@ -525,6 +534,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getInt(PARTITION_STATS_INDEX_PARALLELISM);
}
+ public boolean isPartitionStatsIndexTightBoundEnabled() {
+ return getBooleanOrDefault(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND);
+ }
+
public boolean isSecondaryIndexEnabled() {
// Secondary index is enabled only iff record index (primary key index) is
also enabled
return isRecordIndexEnabled() && getBoolean(SECONDARY_INDEX_ENABLE_PROP);
@@ -736,6 +749,11 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
+ public Builder withPartitionStatsIndexTightBound(boolean enable) {
+ metadataConfig.setValue(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND,
String.valueOf(enable));
+ return this;
+ }
+
public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE,
getDefaultMetadataEnable(engineType));
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index 1c39acb3c8c..50d2a03c025 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.util.ValidationUtils;
import javax.annotation.Nullable;
@@ -25,6 +26,8 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Objects;
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+
/**
* Hoodie metadata for the column range of data stored in columnar format
(like Parquet)
*
@@ -146,6 +149,21 @@ public class HoodieColumnRangeMetadata<T extends
Comparable> implements Serializ
return new HoodieColumnRangeMetadata<>(filePath, columnName, minValue,
maxValue, nullCount, valueCount, totalSize, totalUncompressedSize);
}
+ /**
+ * Converts instance of {@link HoodieMetadataColumnStats} to {@link
HoodieColumnRangeMetadata}
+ */
+ public static HoodieColumnRangeMetadata<Comparable>
fromColumnStats(HoodieMetadataColumnStats columnStats) {
+ return HoodieColumnRangeMetadata.<Comparable>create(
+ columnStats.getFileName(),
+ columnStats.getColumnName(),
+ unwrapAvroValueWrapper(columnStats.getMinValue()),
+ unwrapAvroValueWrapper(columnStats.getMaxValue()),
+ columnStats.getNullCount(),
+ columnStats.getValueCount(),
+ columnStats.getTotalSize(),
+ columnStats.getTotalUncompressedSize());
+ }
+
@SuppressWarnings("rawtype")
public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
String columnName) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index e69b4036a1a..d41e2d2aee8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -168,4 +168,8 @@ public enum WriteOperationType {
public static boolean isPreppedWriteOperation(WriteOperationType
operationType) {
return operationType == BULK_INSERT_PREPPED || operationType ==
INSERT_PREPPED | operationType == UPSERT_PREPPED || operationType ==
DELETE_PREPPED;
}
+
+ public static boolean isPartitionStatsTightBoundRequired(WriteOperationType
operationType) {
+ return operationType == COMPACT || operationType == CLUSTER;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 51466d12474..1f1c7f170d1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -214,7 +214,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
checkState(!partitionFileSlices.isEmpty(), "Number of file slices for
partition " + partitionName + " should be > 0");
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
- engineContext.parallelize(partitionFileSlices))
+ getEngineContext().parallelize(partitionFileSlices))
.flatMap(
(SerializableFunction<FileSlice,
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
// NOTE: Since this will be executed by executors, we can't
access previously cached
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 7e32acd0038..6035dbcbd49 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -519,7 +519,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
public static Stream<HoodieRecord> createPartitionStatsRecords(String
partitionPath,
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
- boolean
isDeleted) {
+ boolean
isDeleted, boolean isTightBound) {
return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath,
columnRangeMetadata.getColumnName()),
MetadataPartitionType.PARTITION_STATS.getPartitionPath());
@@ -535,6 +535,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
.setTotalSize(columnRangeMetadata.getTotalSize())
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
.setIsDeleted(isDeleted)
+ .setIsTightBound(isTightBound)
.build(),
MetadataPartitionType.PARTITION_STATS.getRecordType());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 562504e56bb..286187a38ed 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.FloatWrapper;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -59,11 +60,13 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -77,6 +80,7 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
@@ -129,7 +133,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static java.util.stream.Collectors.toList;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
@@ -138,6 +141,7 @@ import static
org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
@@ -271,21 +275,6 @@ public class HoodieTableMetadataUtil {
Collectors.toMap(HoodieColumnRangeMetadata::getColumnName,
Function.identity()));
}
- /**
- * Converts instance of {@link HoodieMetadataColumnStats} to {@link
HoodieColumnRangeMetadata}
- */
- public static HoodieColumnRangeMetadata<Comparable>
convertColumnStatsRecordToColumnRangeMetadata(HoodieMetadataColumnStats
columnStats) {
- return HoodieColumnRangeMetadata.<Comparable>create(
- columnStats.getFileName(),
- columnStats.getColumnName(),
- unwrapAvroValueWrapper(columnStats.getMinValue()),
- unwrapAvroValueWrapper(columnStats.getMaxValue()),
- columnStats.getNullCount(),
- columnStats.getValueCount(),
- columnStats.getTotalSize(),
- columnStats.getTotalUncompressedSize());
- }
-
public static Option<String> getColumnStatsValueAsString(Object statsValue) {
if (statsValue == null) {
LOG.info("Invalid column stats value: {}", statsValue);
@@ -1215,20 +1204,25 @@ public class HoodieTableMetadataUtil {
}
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
- readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex);
+ readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, false, Option.empty());
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadata, false);
}
private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String filePath,
HoodieTableMetaClient datasetMetaClient,
-
List<String> columnsToIndex) {
+
List<String> columnsToIndex,
+
boolean shouldReadColumnStatsForLogFiles,
+
Option<Schema> writerSchemaOpt) {
try {
+ StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
+ } else if (FSUtils.isLogFile(fullFilePath) &&
shouldReadColumnStatsForLogFiles) {
+ LOG.warn("Reading log file: {}, to build column range metadata.",
fullFilePath);
+ return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt);
}
LOG.warn("Column range index not supported for: {}", filePath);
@@ -1241,9 +1235,41 @@ public class HoodieTableMetadataUtil {
}
}
+ /**
+ * Read column range metadata from log file.
+ */
+ @VisibleForTesting
+ protected static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath,
+
HoodieTableMetaClient datasetMetaClient,
+
List<String> columnsToIndex,
+
Option<Schema> writerSchemaOpt) {
+ if (writerSchemaOpt.isPresent()) {
+ List<Schema.Field> fieldsToIndex =
writerSchemaOpt.get().getFields().stream()
+ .filter(field -> columnsToIndex.contains(field.name()))
+ .collect(Collectors.toList());
+ // read log file records without merging
+ List<HoodieRecord> records = new ArrayList<>();
+ HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(datasetMetaClient.getStorage())
+ .withBasePath(datasetMetaClient.getBasePath())
+ .withLogFilePaths(Collections.singletonList(filePath))
+ .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp())
+ .withReaderSchema(writerSchemaOpt.get())
+ .withTableMetaClient(datasetMetaClient)
+ .withLogRecordScannerCallback(records::add)
+ .build();
+ scanner.scan(false);
+ Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
+ collectColumnRangeMetadata(records, fieldsToIndex, filePath,
writerSchemaOpt.get());
+ return new ArrayList<>(columnRangeMetadataMap.values());
+ }
+ return Collections.emptyList();
+ }
+
/**
* Does an upcast for {@link BigDecimal} instance to align it with
scale/precision expected by
- * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type
+ * the {@link LogicalTypes.Decimal} Avro logical type
*/
public static BigDecimal tryUpcastDecimal(BigDecimal value, final
LogicalTypes.Decimal decimal) {
final int scale = decimal.getScale();
@@ -1810,7 +1836,7 @@ public class HoodieTableMetadataUtil {
final FileSlice fileSlice = partitionAndBaseFile.getValue();
if (!fileSlice.getBaseFile().isPresent()) {
List<String> logFilePaths =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
- .map(l -> l.getPath().toString()).collect(toList());
+ .map(l -> l.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner mergedLogRecordScanner =
HoodieMergedLogRecordScanner.newBuilder()
.withStorage(metaClient.getStorage())
.withBasePath(basePath)
@@ -1921,7 +1947,7 @@ public class HoodieTableMetadataUtil {
return engineContext.parallelize(partitionFileSlicePairs,
parallelism).flatMap(partitionAndBaseFile -> {
final String partition = partitionAndBaseFile.getKey();
final FileSlice fileSlice = partitionAndBaseFile.getValue();
- List<String> logFilePaths =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l ->
l.getPath().toString()).collect(toList());
+ List<String> logFilePaths =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l ->
l.getPath().toString()).collect(Collectors.toList());
Option<StoragePath> dataFilePath =
Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> filePath(basePath,
partition, baseFile.getFileName())).orElseGet(null));
Schema readerSchema;
if (dataFilePath.isPresent()) {
@@ -2042,7 +2068,7 @@ public class HoodieTableMetadataUtil {
private static Stream<HoodieRecord> collectAndProcessColumnMetadata(
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata,
- String partitionPath) {
+ String partitionPath, boolean isTightBound) {
// Step 1: Flatten and Group by Column Name
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap
= fileColumnMetadata.stream()
@@ -2054,17 +2080,19 @@ public class HoodieTableMetadataUtil {
.map(entry ->
FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue()));
// Create Partition Stats Records
- return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(Collectors.toList()), false);
+ return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound);
}
public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
List<DirectoryInfo> partitionInfoList,
HoodieMetadataConfig metadataConfig,
-
HoodieTableMetaClient dataTableMetaClient) {
+
HoodieTableMetaClient dataTableMetaClient,
+
Option<Schema> writerSchemaOpt) {
+ Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ?
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() ->
tryResolveSchemaForTable(dataTableMetaClient));
final List<String> columnsToIndex = getColumnsToIndex(
metadataConfig.isPartitionStatsIndexEnabled(),
metadataConfig.getColumnsEnabledForColumnStatsIndex(),
- Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+ lazyWriterSchemaOpt);
if (columnsToIndex.isEmpty()) {
LOG.warn("No columns to index for partition stats index");
return engineContext.emptyHoodieData();
@@ -2072,14 +2100,15 @@ public class HoodieTableMetadataUtil {
LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndex);
// Create records for MDT
int parallelism = Math.max(Math.min(partitionInfoList.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ Option<Schema> writerSchema = lazyWriterSchemaOpt.get();
return engineContext.parallelize(partitionInfoList,
parallelism).flatMap(partitionInfo -> {
final String partitionPath = partitionInfo.getRelativePath();
// Step 1: Collect Column Metadata for Each File
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionInfo.getFileNameToSizeMap().keySet().stream()
- .map(fileName -> getFileStatsRangeMetadata(partitionPath,
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false))
- .collect(Collectors.toList());
+ .map(fileName -> getFileStatsRangeMetadata(partitionPath,
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false,
true, writerSchemaOpt))
+ .collect(Collectors.toList());
- return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionPath).iterator();
+ return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionPath, true).iterator();
});
}
@@ -2087,7 +2116,9 @@ public class HoodieTableMetadataUtil {
String filePath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
-
boolean isDeleted) {
+
boolean isDeleted,
+
boolean shouldReadColumnMetadataForLogFiles,
+
Option<Schema> writerSchemaOpt) {
String filePartitionPath = filePath.startsWith("/") ?
filePath.substring(1) : filePath;
String fileName = FSUtils.getFileName(filePath, partitionPath);
if (isDeleted) {
@@ -2095,7 +2126,7 @@ public class HoodieTableMetadataUtil {
.map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
.collect(Collectors.toList());
}
- return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex);
+ return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, shouldReadColumnMetadataForLogFiles, writerSchemaOpt);
}
public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,
@@ -2117,7 +2148,8 @@ public class HoodieTableMetadataUtil {
: Option.of(new Schema.Parser().parse(writerSchemaStr)));
HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
- List<String> columnsToIndex =
getColumnsToIndex(metadataConfig.isPartitionStatsIndexEnabled(),
metadataConfig.getColumnsEnabledForColumnStatsIndex(),
Lazy.eagerly(tableSchema));
+ Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
+ List<String> columnsToIndex =
getColumnsToIndex(metadataConfig.isPartitionStatsIndexEnabled(),
metadataConfig.getColumnsEnabledForColumnStatsIndex(), writerSchemaOpt);
if (columnsToIndex.isEmpty()) {
return engineContext.emptyHoodieData();
}
@@ -2130,29 +2162,78 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.toList());
int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+ && metadataConfig.isPartitionStatsIndexTightBoundEnabled() &&
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+ HoodieTableMetadata tableMetadata;
+ if (shouldScanColStatsForTightBound) {
+ tableMetadata = HoodieTableMetadata.create(engineContext,
dataMetaClient.getStorage(), metadataConfig,
dataMetaClient.getBasePath().toString());
+ } else {
+ tableMetadata = null;
+ }
return engineContext.parallelize(partitionedWriteStats,
parallelism).flatMap(partitionedWriteStat -> {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
- // Step 1: Collect Column Metadata for Each File
+ // Step 1: Collect Column Metadata for Each File part of current
commit metadata
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
- .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex))
- .collect(Collectors.toList());
+ .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex, tableSchema))
+ .collect(Collectors.toList());
+ if (shouldScanColStatsForTightBound) {
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames = getPartitionFileSlices(dataMetaClient,
Option.empty(), partitionName, true).stream()
+ .flatMap(fileSlice -> Stream.concat(
+
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+ fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ // Fetch metadata table COLUMN_STATS partition records for above
files
+ List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+
tableMetadata.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex,
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
+ // schema and properties are ignored in getInsertValue, so
simply pass as null
+ .map(record -> record.getData().getInsertValue(null, null))
+ .filter(Option::isPresent)
+ .map(data -> ((HoodieMetadataRecord)
data.get()).getColumnStatsMetadata())
+ .filter(stats -> fileNames.contains(stats.getFileName()))
+ .map(HoodieColumnRangeMetadata::fromColumnStats)
+ .collectAsList();
+ // incase of shouldScanColStatsForTightBound = true, we compute
stats for the partition of interest for all files from getLatestFileSlice()
excluding current commit here
+ // already fileColumnMetadata contains stats for files from the
current infliht commit. so, we are adding both together and sending it to
collectAndProcessColumnMetadata
+ fileColumnMetadata.add(partitionColumnMetadata);
+ }
- return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionName).iterator();
+ return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionName, shouldScanColStatsForTightBound).iterator();
});
} catch (Exception e) {
throw new HoodieException("Failed to generate column stats records for
metadata table", e);
}
}
+ /**
+ * Generate key prefixes for each combination of column name in {@param
columnsToIndex} and {@param partitionName}.
+ */
+ private static List<String> generateKeyPrefixes(List<String> columnsToIndex,
String partitionName) {
+ List<String> keyPrefixes = new ArrayList<>();
+ PartitionIndexID partitionIndexId = new
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionName));
+ for (String columnName : columnsToIndex) {
+ ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
+ String keyPrefix = columnIndexID.asBase64EncodedString()
+ .concat(partitionIndexId.asBase64EncodedString());
+ keyPrefixes.add(keyPrefix);
+ }
+
+ return keyPrefixes;
+ }
+
private static List<HoodieColumnRangeMetadata<Comparable>>
translateWriteStatToFileStats(HoodieWriteStat writeStat,
HoodieTableMetaClient datasetMetaClient,
-
List<String> columnsToIndex) {
+
List<String> columnsToIndex,
+
Option<Schema> writerSchemaOpt) {
if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)
writeStat).getColumnStats().isPresent()) {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
return columnRangeMap.values().stream().collect(Collectors.toList());
}
- return getFileStatsRangeMetadata(writeStat.getPartitionPath(),
writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
+ return getFileStatsRangeMetadata(writeStat.getPartitionPath(),
writeStat.getPath(), datasetMetaClient, columnsToIndex, false, false,
writerSchemaOpt);
}
public static String getPartitionStatsIndexKey(String partitionPath, String
columnName) {
@@ -2165,7 +2246,10 @@ public class HoodieTableMetadataUtil {
public static HoodieMetadataColumnStats
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
HoodieMetadataColumnStats newColumnStats) {
checkArgument(Objects.equals(prevColumnStats.getColumnName(),
newColumnStats.getColumnName()));
-
+ // If new column stats is tight bound, then discard the previous column
stats
+ if (newColumnStats.getIsTightBound()) {
+ return newColumnStats;
+ }
// We're handling 2 cases in here
// - New record is a tombstone: in this case it simply overwrites
previous state
// - Previous record is a tombstone: in that case new proper record would
also
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
index b297b627ba3..2eaf5ef2e7e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
@@ -48,7 +48,8 @@ public class TestAvroSchemaConverter {
+ "`nullCount` BIGINT, "
+ "`totalSize` BIGINT, "
+ "`totalUncompressedSize` BIGINT, "
- + "`isDeleted` BOOLEAN NOT NULL>";
+ + "`isDeleted` BOOLEAN NOT NULL, "
+ + "`isTightBound` BOOLEAN NOT NULL>";
assertThat(dataType.getChildren().get(pos).toString(), is(expected));
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 12068b25708..044a31ad1fa 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -19,7 +19,6 @@
package org.apache.hudi.common.functional;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
@@ -43,15 +42,12 @@ import
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
-import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
-import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HadoopMapRedUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -112,8 +108,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
-import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -136,7 +130,6 @@ import static org.mockito.Mockito.when;
@SuppressWarnings("Duplicates")
public class TestHoodieLogFormat extends HoodieCommonTestHarness {
- private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE =
HoodieLogBlockType.AVRO_DATA_BLOCK;
private static final int BUFFER_SIZE = 4096;
private static HdfsTestService hdfsTestService;
@@ -692,7 +685,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0,
400);
- Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, 4);
+ Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, 4, storage);
FileCreateUtils.createDeltaCommit(basePath, "100", storage);
// scan all log blocks (across multiple log files)
@@ -735,7 +728,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0,
300);
- Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, 3);
+ Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, 3, storage);
FileCreateUtils.createDeltaCommit(basePath, "100", storage);
@@ -823,7 +816,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0,
300);
- Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, 3);
+ Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, 3, storage);
FileCreateUtils.createDeltaCommit(basePath, "100", storage);
@@ -2780,27 +2773,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
}
- public static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType,
List<IndexedRecord> records,
- Map<HeaderMetadataType, String>
header) {
- return getDataBlock(dataBlockType,
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, new StoragePath("dummy_path"));
- }
-
- private static HoodieDataBlock getDataBlock(HoodieLogBlockType
dataBlockType, List<HoodieRecord> records,
- Map<HeaderMetadataType, String>
header, StoragePath pathForReader) {
- switch (dataBlockType) {
- case CDC_DATA_BLOCK:
- return new HoodieCDCDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
- case AVRO_DATA_BLOCK:
- return new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
- case HFILE_DATA_BLOCK:
- return new HoodieHFileDataBlock(records, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader,
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
- case PARQUET_DATA_BLOCK:
- return new HoodieParquetDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
- default:
- throw new RuntimeException("Unknown data block type " + dataBlockType);
- }
- }
-
private static Stream<Arguments> testArguments() {
// Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled,
Arg3: enableOptimizedLogBlocksScan
return Stream.of(
@@ -2825,58 +2797,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
);
}
- private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
- Schema schema,
- List<IndexedRecord> records,
- int numFiles)
- throws IOException, InterruptedException {
- return writeLogFiles(partitionPath, schema, records, numFiles, false);
- }
-
- private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
- Schema schema,
- List<IndexedRecord> records,
- int numFiles,
- boolean
enableBlockSequenceNumbers)
- throws IOException, InterruptedException {
- int blockSeqNo = 0;
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withSizeThreshold(1024).withFileId("test-fileid1").withDeltaCommit("100")
- .withStorage(storage).build();
- if (storage.exists(writer.getLogFile().getPath())) {
- // enable append for reader test.
- ((HoodieLogFormatWriter) writer).withOutputStream(
- (FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
- }
- Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-
- Set<HoodieLogFile> logFiles = new HashSet<>();
-
- // Create log files
- int recordsPerFile = records.size() / numFiles;
- int filesWritten = 0;
-
- while (filesWritten < numFiles) {
- int targetRecordsCount = filesWritten == numFiles - 1
- ? recordsPerFile + (records.size() % recordsPerFile)
- : recordsPerFile;
- int offset = filesWritten * recordsPerFile;
- List<IndexedRecord> targetRecords = records.subList(offset, offset +
targetRecordsCount);
-
- logFiles.add(writer.getLogFile());
- writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords,
header));
- filesWritten++;
- }
-
- writer.close();
-
- return logFiles;
- }
-
/**
* Utility to convert the given iterator to a List.
*/
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index be25eb4bfb2..c4727688546 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -18,26 +18,47 @@
package org.apache.hudi.common.testutils;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -46,6 +67,9 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+
/**
* The common hoodie test harness to provide the basic infrastructure.
*/
@@ -53,6 +77,7 @@ public class HoodieCommonTestHarness {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieCommonTestHarness.class);
protected static final String BASE_FILE_EXTENSION =
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
protected static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
null;
+ protected static final HoodieLogBlock.HoodieLogBlockType
DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
protected String tableName;
protected String basePath;
@@ -242,4 +267,105 @@ public class HoodieCommonTestHarness {
.collect(Collectors.toList());
return !pendingInstants.isEmpty();
}
+
+ protected static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
+ Schema schema,
+ List<HoodieRecord> records,
+ int numFiles,
+ HoodieStorage storage,
+ Properties props,
+ String fileId,
+ String commitTime)
+ throws IOException, InterruptedException {
+ List<IndexedRecord> indexedRecords = records.stream()
+ .map(record -> {
+ try {
+ return record.toIndexedRecord(schema, props);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .filter(Option::isPresent)
+ .map(Option::get)
+ .map(HoodieRecord::getData)
+ .collect(Collectors.toList());
+ return writeLogFiles(partitionPath, schema, indexedRecords, numFiles,
false, storage, fileId, commitTime);
+ }
+
+ protected static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
+ Schema schema,
+ List<IndexedRecord>
records,
+ int numFiles,
+ HoodieStorage storage)
+ throws IOException, InterruptedException {
+ return writeLogFiles(partitionPath, schema, records, numFiles, false,
storage, "test-fileid1", "100");
+ }
+
+ protected static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
+ Schema schema,
+ List<IndexedRecord>
records,
+ int numFiles,
+ boolean
enableBlockSequenceNumbers,
+ HoodieStorage storage,
+ String fileId,
+ String commitTime)
+ throws IOException, InterruptedException {
+ int blockSeqNo = 0;
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+
.withSizeThreshold(1024).withFileId(fileId).withDeltaCommit(commitTime)
+ .withStorage(storage).build();
+ if (storage.exists(writer.getLogFile().getPath())) {
+ // enable append for reader test.
+ ((HoodieLogFormatWriter) writer).withOutputStream(
+ (FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
+ }
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+
+ Set<HoodieLogFile> logFiles = new HashSet<>();
+
+ // Create log files
+ int recordsPerFile = records.size() / numFiles;
+ int filesWritten = 0;
+
+ while (filesWritten < numFiles) {
+ int targetRecordsCount = filesWritten == numFiles - 1
+ ? recordsPerFile + (records.size() % recordsPerFile)
+ : recordsPerFile;
+ int offset = filesWritten * recordsPerFile;
+ List<IndexedRecord> targetRecords = records.subList(offset, offset +
targetRecordsCount);
+
+ logFiles.add(writer.getLogFile());
+ writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords,
header));
+ filesWritten++;
+ }
+
+ writer.close();
+
+ return logFiles;
+ }
+
+ public static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType
dataBlockType, List<IndexedRecord> records,
+
Map<HoodieLogBlock.HeaderMetadataType, String> header) {
+ return getDataBlock(dataBlockType,
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, new StoragePath("dummy_path"));
+ }
+
+ private static HoodieDataBlock
getDataBlock(HoodieLogBlock.HoodieLogBlockType dataBlockType,
List<HoodieRecord> records,
+
Map<HoodieLogBlock.HeaderMetadataType, String> header, StoragePath
pathForReader) {
+ switch (dataBlockType) {
+ case CDC_DATA_BLOCK:
+ return new HoodieCDCDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ case AVRO_DATA_BLOCK:
+ return new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ case HFILE_DATA_BLOCK:
+ return new HoodieHFileDataBlock(records, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader,
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+ case PARQUET_DATA_BLOCK:
+ return new HoodieParquetDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
+ default:
+ throw new RuntimeException("Unknown data block type " + dataBlockType);
+ }
+ }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
index 5e86a6b6de4..7fcc0d16193 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
@@ -225,17 +225,17 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
HoodieColumnRangeMetadata<Comparable> fileColumnRange1 =
HoodieColumnRangeMetadata.<Comparable>create(
"path/to/file", "columnName", 1, 5, 0, 10, 100, 200);
HoodieRecord<HoodieMetadataPayload> firstPartitionStatsRecord =
- HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange1), false).findFirst().get();
+ HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange1), false, false).findFirst().get();
HoodieColumnRangeMetadata<Comparable> fileColumnRange2 =
HoodieColumnRangeMetadata.<Comparable>create(
"path/to/file", "columnName", 3, 8, 1, 15, 120, 250);
HoodieRecord<HoodieMetadataPayload> updatedPartitionStatsRecord =
- HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange2), false).findFirst().get();
+ HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange2), false, false).findFirst().get();
HoodieMetadataPayload combinedPartitionStatsRecordPayload =
updatedPartitionStatsRecord.getData().preCombine(firstPartitionStatsRecord.getData());
HoodieColumnRangeMetadata<Comparable> expectedColumnRange =
HoodieColumnRangeMetadata.<Comparable>create(
"path/to/file", "columnName", 1, 8, 1, 25, 220, 450);
HoodieMetadataPayload expectedColumnRangeMetadata =
(HoodieMetadataPayload) HoodieMetadataPayload.createPartitionStatsRecords(
- PARTITION_NAME, Collections.singletonList(expectedColumnRange),
false).findFirst().get().getData();
+ PARTITION_NAME, Collections.singletonList(expectedColumnRange), false,
false).findFirst().get().getData();
assertEquals(expectedColumnRangeMetadata,
combinedPartitionStatsRecordPayload);
}
@@ -244,19 +244,19 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
HoodieColumnRangeMetadata<Comparable> fileColumnRange1 =
HoodieColumnRangeMetadata.<Comparable>create(
"path/to/file", "columnName", 1, 5, 0, 10, 100, 200);
HoodieRecord<HoodieMetadataPayload> firstPartitionStatsRecord =
- HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange1), false).findFirst().get();
+ HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange1), false, false).findFirst().get();
HoodieColumnRangeMetadata<Comparable> fileColumnRange2 =
HoodieColumnRangeMetadata.<Comparable>create(
"path/to/file", "columnName", 3, 8, 1, 15, 120, 250);
// create delete payload
HoodieRecord<HoodieMetadataPayload> deletedPartitionStatsRecord =
- HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange2), true).findFirst().get();
+ HoodieMetadataPayload.createPartitionStatsRecords(PARTITION_NAME,
Collections.singletonList(fileColumnRange2), true, false).findFirst().get();
// deleted (or tombstone) record will be therefore deleting previous state
of the record
HoodieMetadataPayload combinedPartitionStatsRecordPayload =
deletedPartitionStatsRecord.getData().preCombine(firstPartitionStatsRecord.getData());
HoodieColumnRangeMetadata<Comparable> expectedColumnRange =
HoodieColumnRangeMetadata.<Comparable>create(
"path/to/file", "columnName", 3, 8, 1, 15, 120, 250);
HoodieMetadataPayload expectedColumnRangeMetadata =
(HoodieMetadataPayload) HoodieMetadataPayload.createPartitionStatsRecords(
- PARTITION_NAME, Collections.singletonList(expectedColumnRange),
true).findFirst().get().getData();
+ PARTITION_NAME, Collections.singletonList(expectedColumnRange), true,
false).findFirst().get().getData();
assertEquals(expectedColumnRangeMetadata,
combinedPartitionStatsRecordPayload);
// another update for the same key should overwrite the delete record
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 83b6abe12e5..4ccc48b519d 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -25,12 +25,16 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -47,6 +51,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -144,25 +149,10 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.withColumnStatsIndexForColumns("rider,driver")
.withPartitionStatsIndexParallelism(1)
.build(),
- metaClient);
+ metaClient,
+ Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
// Validate the result.
- List<HoodieRecord> records = result.collectAsList();
- // 3 partitions * 2 columns = 6 partition stats records
- assertEquals(6, records.size());
- assertEquals(MetadataPartitionType.PARTITION_STATS.getPartitionPath(),
records.get(0).getPartitionPath());
- ((HoodieMetadataPayload)
result.collectAsList().get(0).getData()).getColumnStatMetadata().get().getColumnName();
- records.forEach(r -> {
- HoodieMetadataPayload payload = (HoodieMetadataPayload) r.getData();
- assertTrue(payload.getColumnStatMetadata().isPresent());
- // instant1 < instant2 so instant1 should be in the min value and
instant2 should be in the max value.
- if
(payload.getColumnStatMetadata().get().getColumnName().equals("rider")) {
- assertEquals(String.format("{\"value\": \"rider-%s\"}", instant1),
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
- assertEquals(String.format("{\"value\": \"rider-%s\"}", instant2),
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
- } else if
(payload.getColumnStatMetadata().get().getColumnName().equals("driver")) {
- assertEquals(String.format("{\"value\": \"driver-%s\"}", instant1),
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
- assertEquals(String.format("{\"value\": \"driver-%s\"}", instant2),
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
- }
- });
+ validatePartitionStats(result, instant1, instant2);
}
@Test
@@ -212,6 +202,90 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
}
}
+ @Test
+ public void testGetLogFileColumnRangeMetadata() throws Exception {
+ HoodieLocalEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getStorageConf());
+ String instant1 = "20230918120000000";
+ hoodieTestTable = hoodieTestTable.addCommit(instant1);
+ String instant2 = "20230918121110000";
+ hoodieTestTable = hoodieTestTable.addCommit(instant2);
+ List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new
ArrayList<>();
+ List<String> columnsToIndex = Arrays.asList("rider", "driver");
+ // Generate 10 inserts for each partition and populate
partitionBaseFilePairs and recordKeys.
+ DATE_PARTITIONS.forEach(p -> {
+ try {
+ URI partitionMetaFile =
FileCreateUtils.createPartitionMetaFile(basePath, p);
+ StoragePath partitionMetadataPath = new StoragePath(partitionMetaFile);
+ String fileId1 = UUID.randomUUID().toString();
+ // add only one parquet file in first file slice
+ FileSlice fileSlice1 = new FileSlice(p, instant1, fileId1);
+ StoragePath storagePath1 = new
StoragePath(hoodieTestTable.getBaseFilePath(p, fileId1).toUri());
+ writeParquetFile(instant1, storagePath1,
dataGen.generateInsertsForPartition(instant1, 10, p), metaClient,
engineContext);
+ HoodieBaseFile baseFile1 = new
HoodieBaseFile(hoodieTestTable.getBaseFilePath(p, fileId1).toString());
+ fileSlice1.setBaseFile(baseFile1);
+ // add log file in second file slice with higher rider and driver
values (which are concatenated with instant)
+ FileSlice fileSlice2 = new FileSlice(p, instant2, fileId1);
+ fileSlice2.setBaseFile(baseFile1);
+ StoragePath storagePath2 = new
StoragePath(partitionMetadataPath.getParent(),
hoodieTestTable.getLogFileNameById(fileId1, 1));
+ writeLogFiles(new StoragePath(metaClient.getBasePath(), p),
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
dataGen.generateInsertsForPartition(instant2, 10, p), 1,
+ metaClient.getStorage(), new Properties(), fileId1, instant2);
+ fileSlice2.addLogFile(new
HoodieLogFile(storagePath2.toUri().toString()));
+ partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
+ p,
+
metaClient.getStorage().listDirectEntries(Arrays.asList(partitionMetadataPath,
storagePath1, storagePath2)),
+ instant2,
+ Collections.emptySet()));
+ // NOTE: we need to set table config as we are not using write client
explicitly and these configs are needed for log record reader
+
metaClient.getTableConfig().setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(),
"false");
+
metaClient.getTableConfig().setValue(HoodieTableConfig.RECORDKEY_FIELDS.key(),
"_row_key");
+
metaClient.getTableConfig().setValue(HoodieTableConfig.PARTITION_FIELDS.key(),
"partition_path");
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataLogFile
= HoodieTableMetadataUtil.getLogFileColumnRangeMetadata(
+ storagePath2.toString(),
+ metaClient,
+ columnsToIndex,
+
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+ // there must be two ranges for rider and driver
+ assertEquals(2, columnRangeMetadataLogFile.size());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // collect partition stats, this will collect stats for log files as well
+ HoodieData<HoodieRecord> result =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
+ engineContext,
+ partitionInfoList,
+ HoodieMetadataConfig.newBuilder().enable(true)
+ .withMetadataIndexColumnStats(true)
+ .withMetadataIndexPartitionStats(true)
+ .withColumnStatsIndexForColumns("rider,driver")
+ .withPartitionStatsIndexParallelism(1)
+ .build(),
+ metaClient,
+ Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+ // Validate the result.
+ validatePartitionStats(result, instant1, instant2);
+ }
+
+ private static void validatePartitionStats(HoodieData<HoodieRecord> result,
String instant1, String instant2) {
+ List<HoodieRecord> records = result.collectAsList();
+ // 3 partitions * 2 columns = 6 partition stats records
+ assertEquals(6, records.size());
+ assertEquals(MetadataPartitionType.PARTITION_STATS.getPartitionPath(),
records.get(0).getPartitionPath());
+ ((HoodieMetadataPayload)
result.collectAsList().get(0).getData()).getColumnStatMetadata().get().getColumnName();
+ records.forEach(r -> {
+ HoodieMetadataPayload payload = (HoodieMetadataPayload) r.getData();
+ assertTrue(payload.getColumnStatMetadata().isPresent());
+ // instant1 < instant2 so instant1 should be in the min value and
instant2 should be in the max value.
+ if
(payload.getColumnStatMetadata().get().getColumnName().equals("rider")) {
+ assertEquals(String.format("{\"value\": \"rider-%s\"}", instant1),
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
+ assertEquals(String.format("{\"value\": \"rider-%s\"}", instant2),
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
+ } else if
(payload.getColumnStatMetadata().get().getColumnName().equals("driver")) {
+ assertEquals(String.format("{\"value\": \"driver-%s\"}", instant1),
String.valueOf(payload.getColumnStatMetadata().get().getMinValue()));
+ assertEquals(String.format("{\"value\": \"driver-%s\"}", instant2),
String.valueOf(payload.getColumnStatMetadata().get().getMaxValue()));
+ }
+ });
+ }
+
private static void writeParquetFile(String instant,
StoragePath path,
List<HoodieRecord> records,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 72cc75f6799..8609c87d128 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -236,36 +236,6 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
}
}
- test(s"Test partition stats index on int type field with update and file
pruning") {
- Seq("cow", "mor").foreach { tableType =>
- withTempDir { tmp =>
- val tableName = generateTableName
- val tablePath = s"${tmp.getCanonicalPath}/$tableName"
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price int,
- | ts long
- |) using hudi
- |partitioned by (ts)
- |tblproperties (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'price',
- | hoodie.metadata.index.partition.stats.enable = 'true',
- | hoodie.metadata.index.column.stats.column.list = 'price'
- |)
- |location '$tablePath'
- |""".stripMargin
- )
-
- writeAndValidatePartitionStats(tableName, tablePath)
- }
- }
- }
-
test(s"Test partition stats index without configuring columns to index") {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
@@ -308,6 +278,55 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
}
}
+ test(s"Test partition stats index on int type field with update and file
pruning") {
+ Seq("cow", "mor").foreach { tableType =>
+ Seq(true, false).foreach { shouldCompact =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price int,
+ | ts long
+ |) using hudi
+ |partitioned by (ts)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'price',
+ | hoodie.metadata.index.partition.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'price'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
+ )
+
+ // trigger compaction after update and validate stats
+ if (tableType == "mor" && shouldCompact) {
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+ }
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ writeAndValidatePartitionStats(tableName, tablePath)
+ if (tableType == "mor" && shouldCompact) {
+ // check partition stats records with tightBound
+ checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000,
false),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000,
false),
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001,
false)
+ )
+ }
+ }
+ }
+ }
+ }
+
private def writeAndValidatePartitionStats(tableName: String, tablePath:
String): Unit = {
spark.sql(
s"""
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 9cd840df5a3..73ea18b6139 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1009,7 +1009,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
.getSortedColumnStatsList(partitionPath, latestFileNames);
TreeSet<HoodieColumnRangeMetadata<Comparable>> aggregatedColumnStats =
aggregateColumnStats(partitionPath, colStats);
- List<HoodieRecord> partitionStatRecords =
HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, new
ArrayList<>(aggregatedColumnStats), false)
+ // TODO: fix `isTightBound` flag when stats based on log files are
available
+ List<HoodieRecord> partitionStatRecords =
HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, new
ArrayList<>(aggregatedColumnStats), false, false)
.collect(Collectors.toList());
return partitionStatRecords.stream()
.map(record -> {
@@ -1719,7 +1720,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
return allColumnNameList.stream()
.flatMap(columnName ->
tableMetadata.getColumnStats(partitionFileNameList,
columnName).values().stream()
-
.map(HoodieTableMetadataUtil::convertColumnStatsRecordToColumnRangeMetadata)
+ .map(HoodieColumnRangeMetadata::fromColumnStats)
.collect(Collectors.toList())
.stream())
.sorted(new HoodieColumnRangeMetadataComparator())