This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 588e01114ce [HUDI-8371] Fix column stats index with MDT for few
scenarios (#12105)
588e01114ce is described below
commit 588e01114ce3a1cad02c0d229e8919a29ee88770
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Oct 27 19:14:09 2024 -0700
[HUDI-8371] Fix column stats index with MDT for few scenarios (#12105)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 34 +-
.../java/org/apache/hudi/common/fs/FSUtils.java | 8 +
.../table/log/HoodieUnMergedLogRecordScanner.java | 2 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 39 +-
.../hudi/metadata/HoodieMetadataPayload.java | 14 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 107 ++---
.../org/apache/hudi/common/fs/TestFSUtils.java | 4 +
.../hudi/metadata/TestHoodieMetadataPayload.java | 28 ++
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 12 +-
.../hudi/testutils/LogFileColStatsTestUtil.java | 96 +++++
...otstrap-rollback1-column-stats-index-table.json | 2 +
.../cow-bootstrap1-column-stats-index-table.json | 4 +
.../cow-bootstrap2-column-stats-index-table.json | 5 +
.../cow-clean1-column-stats-index-table.json | 2 +
...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json | 1 +
...otstrap-rollback1-column-stats-index-table.json | 2 +
.../mor-bootstrap1-column-stats-index-table.json | 3 +
.../mor-bootstrap2-column-stats-index-table.json | 5 +
.../mor-clean1-column-stats-index-table.json | 2 +
...mor-delete-block1-column-stats-index-table.json | 3 +
...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json | 10 +
...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json | 5 +
...-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json | 5 +
.../hudi/functional/ColumnStatIndexTestBase.scala | 162 ++++++--
.../hudi/functional/TestColumnStatsIndex.scala | 451 ++++++++++++++++++++-
.../functional/TestColumnStatsIndexWithSQL.scala | 26 +-
.../functional/TestSecondaryIndexPruning.scala | 17 +-
.../hudi/command/index/TestFunctionalIndex.scala | 3 +-
.../TestHoodieMetadataTableValidator.java | 2 +-
29 files changed, 896 insertions(+), 158 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5e244c2b121..0e8d5339cd2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -239,13 +239,13 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
Option<String>
inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
- List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.getValidValues().length);
+ List<MetadataPartitionType> metadataPartitionsToInit = new
ArrayList<>(MetadataPartitionType.getValidValues().length);
try {
boolean exists = metadataTableExists(dataMetaClient);
if (!exists) {
// FILES partition is always required
- partitionsToInit.add(FILES);
+ metadataPartitionsToInit.add(FILES);
}
// check if any of the enabled partition types needs to be initialized
@@ -255,10 +255,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
LOG.info("Async metadata indexing disabled and following partitions
already initialized: {}", completedPartitions);
this.enabledPartitionTypes.stream()
.filter(p -> !completedPartitions.contains(p.getPartitionPath())
&& !FILES.equals(p))
- .forEach(partitionsToInit::add);
+ .forEach(metadataPartitionsToInit::add);
}
- if (partitionsToInit.isEmpty()) {
+ if (metadataPartitionsToInit.isEmpty()) {
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
@@ -268,13 +268,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
- LOG.error("Failed to initialize MDT from filesystem");
- return false;
- }
-
+ initializeFromFilesystem(initializationTime, metadataPartitionsToInit,
inflightInstantTimestamp);
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
return true;
} catch (IOException e) {
@@ -344,7 +338,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
+ private void initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
@@ -461,8 +455,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
if (LOG.isInfoEnabled()) {
- LOG.info("Initializing {} index with {} mappings and {} file groups.",
partitionTypeName, fileGroupCountAndRecordsPair.getKey(),
- fileGroupCountAndRecordsPair.getValue().count());
+ LOG.info("Initializing {} index with {} mappings", partitionTypeName,
fileGroupCountAndRecordsPair.getKey());
}
HoodieTimer partitionInitTimer = HoodieTimer.start();
@@ -482,8 +475,6 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
long totalInitTime = partitionInitTimer.endTimer();
LOG.info("Initializing {} index in metadata table took {} in ms",
partitionTypeName, totalInitTime);
}
-
- return true;
}
/**
@@ -520,9 +511,11 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
private Pair<Integer, HoodieData<HoodieRecord>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
+ // during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap,
dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
+ dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
+ dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
@@ -863,12 +856,13 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromMDT(String
initializationTime, Set<String> pendingDataInstants) throws IOException {
- List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
+ List<String> allAbsolutePartitionPaths =
metadata.getAllPartitionPaths().stream()
.map(partitionPath -> dataWriteConfig.getBasePath() +
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
- Map<String, List<StoragePathInfo>> partitionFileMap =
metadata.getAllFilesInPartitions(allPartitionPaths);
+ Map<String, List<StoragePathInfo>> partitionFileMap =
metadata.getAllFilesInPartitions(allAbsolutePartitionPaths);
List<DirectoryInfo> dirinfoList = new ArrayList<>(partitionFileMap.size());
for (Map.Entry<String, List<StoragePathInfo>> entry :
partitionFileMap.entrySet()) {
- dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(),
initializationTime, pendingDataInstants));
+ String relativeDirPath = FSUtils.getRelativePartitionPath(new
StoragePath(dataWriteConfig.getBasePath()), new StoragePath(entry.getKey()));
+ dirinfoList.add(new DirectoryInfo(relativeDirPath, entry.getValue(),
initializationTime, pendingDataInstants, false));
}
return dirinfoList;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 65fb5073d7a..1537a758bd8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -154,6 +154,14 @@ public class FSUtils {
return fullFileName.split("_", 2)[0];
}
+ /**
+ * @param filePath
+ * @returns the filename from the given path. Path could be the absolute
path or just partition path and file name.
+ */
+ public static String getFileNameFromPath(String filePath) {
+ return filePath.substring(filePath.lastIndexOf("/") + 1);
+ }
+
/**
* Gets all partition paths assuming date partitioning (year, month, day)
three levels down.
* TODO: (Lin) Delete this function after we remove the
assume.date.partitioning config completely.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 1ce3dea58cb..61330466ac5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -83,7 +83,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
- throw new IllegalStateException("Not expected to see delete records in
this log-scan mode. Check Job Config");
+ // no - op
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index d8fa78935f3..1abd7d05a13 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -840,12 +840,17 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
Set<String> keySet = new TreeSet<>(recordKeys);
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (keySet.contains(recordKey)) {
- logRecordsMap.put(recordKey, record);
+ if (!payload.isDeleted()) { // process only valid records.
+ String recordKey = payload.getRecordKeyFromSecondaryIndex();
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ }
+ } else {
+ deletedRecordsFromLogs.add(record.getRecordKey());
}
});
@@ -856,7 +861,11 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord =
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
return mergedRecord.orElseGet(null);
}));
- baseFileRecords.forEach((key, value) -> recordKeyMap.put(key,
value.getRecordKey()));
+ baseFileRecords.forEach((key, value) -> {
+ if (!deletedRecordsFromLogs.contains(key)) {
+ recordKeyMap.put(key, value.getRecordKey());
+ }
+ });
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table
for " + recordKeys.size() + " key : ", ioe);
} finally {
@@ -931,17 +940,22 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
secondaryKeySet.addAll(sortedSecondaryKeys);
Collections.sort(sortedSecondaryKeys);
+ Set<String> deletedRecordKeysFromLogs = new HashSet<>();
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
- String secondaryKey = payload.key;
- if (secondaryKeySet.contains(secondaryKey)) {
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- logRecordsMap.computeIfAbsent(secondaryKey, k -> new
HashMap<>()).put(recordKey, record);
+ if (!payload.isDeleted()) {
+ String secondaryKey = payload.key;
+ if (secondaryKeySet.contains(secondaryKey)) {
+ String recordKey = payload.getRecordKeyFromSecondaryIndex();
+ logRecordsMap.computeIfAbsent(secondaryKey, k -> new
HashMap<>()).put(recordKey, record);
+ }
+ } else {
+ deletedRecordKeysFromLogs.add(record.getRecordKey());
}
});
- return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader,
sortedSecondaryKeys, logRecordsMap, timings, partitionName);
+ return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader,
sortedSecondaryKeys, logRecordsMap, timings, partitionName,
deletedRecordKeysFromLogs);
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table
for " + secondaryKeys.size() + " key : ", ioe);
} finally {
@@ -955,7 +969,8 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
List<String> sortedKeys,
Map<String, HashMap<String, HoodieRecord>>
logRecordsMap,
List<Long> timings,
-
String partitionName) throws IOException {
+
String partitionName,
+
Set<String> deleteRecordKeysFromLogs) throws
IOException {
HoodieTimer timer = HoodieTimer.start();
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new
HashMap<>();
@@ -978,9 +993,13 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
if (logRecordsMap.isEmpty() && !baseFileRecordsMap.isEmpty()) {
// file slice has only base file
timings.add(timer.endTimer());
+ if (!deleteRecordKeysFromLogs.isEmpty()) { // remove deleted records
from log from base file record list
+ deleteRecordKeysFromLogs.forEach(key ->
baseFileRecordsMap.remove(key));
+ }
return baseFileRecordsMap;
}
+ // check why we are not considering records missing from logs, but only
from base file.
logRecordsMap.forEach((secondaryKey, logRecords) -> {
if (!baseFileRecordsMap.containsKey(secondaryKey)) {
List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 4300811be08..aff3721b745 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -206,23 +206,23 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
}
protected HoodieMetadataPayload(String key, int type, Map<String,
HoodieMetadataFileInfo> filesystemMetadata) {
- this(key, type, filesystemMetadata, null, null, null, null);
+ this(key, type, filesystemMetadata, null, null, null, null, false);
}
protected HoodieMetadataPayload(String key, HoodieMetadataBloomFilter
metadataBloomFilter) {
- this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null,
metadataBloomFilter, null, null, null);
+ this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null,
metadataBloomFilter, null, null, null, metadataBloomFilter.getIsDeleted());
}
protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats
columnStats, int recordType) {
- this(key, recordType, null, null, columnStats, null, null);
+ this(key, recordType, null, null, columnStats, null, null,
columnStats.getIsDeleted());
}
private HoodieMetadataPayload(String key, HoodieRecordIndexInfo
recordIndexMetadata) {
- this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null,
null, recordIndexMetadata, null);
+ this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null,
null, recordIndexMetadata, null, false);
}
private HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo
secondaryIndexMetadata) {
- this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null,
null, null, null, secondaryIndexMetadata);
+ this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null,
null, null, null, secondaryIndexMetadata,
secondaryIndexMetadata.getIsDeleted());
}
protected HoodieMetadataPayload(String key, int type,
@@ -230,7 +230,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
HoodieMetadataBloomFilter
metadataBloomFilter,
HoodieMetadataColumnStats columnStats,
HoodieRecordIndexInfo recordIndexMetadata,
- HoodieSecondaryIndexInfo
secondaryIndexMetadata) {
+ HoodieSecondaryIndexInfo
secondaryIndexMetadata,
+ boolean isDeletedRecord) {
this.key = key;
this.type = type;
this.filesystemMetadata = filesystemMetadata;
@@ -238,6 +239,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
this.columnStatMetadata = columnStats;
this.recordIndexMetadata = recordIndexMetadata;
this.secondaryIndexMetadata = secondaryIndexMetadata;
+ this.isDeletedRecord = isDeletedRecord;
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cee1e8e9cab..765d819b09a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -75,7 +75,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ExternalFilePathUtil;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -142,10 +141,10 @@ import static
org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -751,12 +750,8 @@ public class HoodieTableMetadataUtil {
return engineContext.parallelize(deleteFileList, parallelism)
.flatMap(deleteFileInfoPair -> {
String partitionPath = deleteFileInfoPair.getLeft();
- String filePath = deleteFileInfoPair.getRight();
-
- if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())
|| ExternalFilePathUtil.isExternallyCreatedFile(filePath)) {
- return getColumnStatsRecords(partitionPath, filePath,
dataMetaClient, columnsToIndex, true).iterator();
- }
- return Collections.emptyListIterator();
+ String fileName = deleteFileInfoPair.getRight();
+ return getColumnStatsRecords(partitionPath, fileName,
dataMetaClient, columnsToIndex, true).iterator();
});
}
@@ -953,7 +948,8 @@ public class HoodieTableMetadataUtil {
HoodieTableMetaClient dataMetaClient,
boolean isColumnStatsIndexEnabled,
int
columnStatsIndexParallelism,
-
List<String> targetColumnsForColumnStatsIndex) {
+
List<String> targetColumnsForColumnStatsIndex,
+ int
maxReaderBufferSize) {
// Find the columns to index
final List<String> columnsToIndex =
getColumnsToIndex(isColumnStatsIndexEnabled,
targetColumnsForColumnStatsIndex,
@@ -972,16 +968,10 @@ public class HoodieTableMetadataUtil {
// Create records MDT
int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(),
columnStatsIndexParallelism), 1);
return engineContext.parallelize(partitionFileFlagTupleList,
parallelism).flatMap(partitionFileFlagTuple -> {
- final String partitionName = partitionFileFlagTuple.f0;
+ final String partitionPath = partitionFileFlagTuple.f0;
final String filename = partitionFileFlagTuple.f1;
final boolean isDeleted = partitionFileFlagTuple.f2;
- if (!FSUtils.isBaseFile(new StoragePath(filename)) ||
!filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- LOG.warn("Ignoring file {} as it is not a PARQUET file", filename);
- return Stream.<HoodieRecord>empty().iterator();
- }
-
- final String filePathWithPartition = partitionName + "/" + filename;
- return getColumnStatsRecords(partitionName, filePathWithPartition,
dataMetaClient, columnsToIndex, isDeleted).iterator();
+ return getColumnStatsRecords(partitionPath, filename, dataMetaClient,
columnsToIndex, isDeleted, maxReaderBufferSize).iterator();
});
}
@@ -1221,54 +1211,62 @@ public class HoodieTableMetadataUtil {
return
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(),
columnRangeMetadataList, false);
}
- return getColumnStatsRecords(writeStat.getPartitionPath(),
writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
+ String filePath = writeStat.getPath();
+ return getColumnStatsRecords(writeStat.getPartitionPath(),
getFileNameFromPath(filePath), datasetMetaClient, columnsToIndex, false);
}
private static Stream<HoodieRecord> getColumnStatsRecords(String
partitionPath,
- String filePath,
+ String fileName,
HoodieTableMetaClient datasetMetaClient,
List<String>
columnsToIndex,
boolean isDeleted)
{
- String filePartitionPath = filePath.startsWith("/") ?
filePath.substring(1) : filePath;
- String fileName =
filePartitionPath.substring(filePartitionPath.lastIndexOf("/") + 1);
+ return getColumnStatsRecords(partitionPath, fileName, datasetMetaClient,
columnsToIndex, isDeleted, -1);
+ }
+
+ private static Stream<HoodieRecord> getColumnStatsRecords(String
partitionPath,
+ String fileName,
+
HoodieTableMetaClient datasetMetaClient,
+ List<String>
columnsToIndex,
+ boolean isDeleted,
+ int maxBufferSize)
{
if (isDeleted) {
- // TODO we should delete records instead of stubbing them
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
columnsToIndex.stream()
.map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
.collect(Collectors.toList());
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadataList, true);
}
-
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
- readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, false, Option.empty());
+ readColumnRangeMetadataFrom(partitionPath, fileName,
datasetMetaClient, columnsToIndex, maxBufferSize);
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadata, false);
}
- private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String filePath,
+ private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String partitionPath,
+
String fileName,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
-
boolean shouldReadColumnStatsForLogFiles,
-
Option<Schema> writerSchemaOpt) {
+
int maxBufferSize) {
+ String partitionPathFileName = (partitionPath.equals(EMPTY_PARTITION_NAME)
|| partitionPath.equals(NON_PARTITIONED_NAME)) ? fileName
+ : partitionPath + "/" + fileName;
try {
- StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
- if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), partitionPathFileName);
+ if
(partitionPathFileName.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
- } else if (FSUtils.isLogFile(fullFilePath) &&
shouldReadColumnStatsForLogFiles) {
- LOG.warn("Reading log file: {}, to build column range metadata.",
fullFilePath);
- return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt);
+ } else if (FSUtils.isLogFile(fileName)) {
+ Option<Schema> writerSchemaOpt =
tryResolveSchemaForTable(datasetMetaClient);
+ LOG.warn("Reading log file: {}, to build column range metadata.",
partitionPathFileName);
+ return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize);
}
-
- LOG.warn("Column range index not supported for: {}", filePath);
+ LOG.warn("Column range index not supported for: {}",
partitionPathFileName);
return Collections.emptyList();
} catch (Exception e) {
// NOTE: In case reading column range metadata from individual file
failed,
// we simply fall back, in lieu of failing the whole task
- LOG.error("Failed to fetch column range metadata for: {}", filePath);
+ LOG.error("Failed to fetch column range metadata for: {}",
partitionPathFileName);
return Collections.emptyList();
}
}
@@ -1280,7 +1278,8 @@ public class HoodieTableMetadataUtil {
protected static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
-
Option<Schema> writerSchemaOpt) {
+
Option<Schema> writerSchemaOpt,
+
int maxBufferSize) throws IOException {
if (writerSchemaOpt.isPresent()) {
List<Schema.Field> fieldsToIndex =
writerSchemaOpt.get().getFields().stream()
.filter(field -> columnsToIndex.contains(field.name()))
@@ -1291,15 +1290,18 @@ public class HoodieTableMetadataUtil {
.withStorage(datasetMetaClient.getStorage())
.withBasePath(datasetMetaClient.getBasePath())
.withLogFilePaths(Collections.singletonList(filePath))
- .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+ .withBufferSize(maxBufferSize)
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp())
.withReaderSchema(writerSchemaOpt.get())
.withTableMetaClient(datasetMetaClient)
.withLogRecordScannerCallback(records::add)
.build();
- scanner.scan(false);
+ scanner.scan();
+ if (records.isEmpty()) {
+ return Collections.emptyList();
+ }
Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
- collectColumnRangeMetadata(records, fieldsToIndex, filePath,
writerSchemaOpt.get());
+ collectColumnRangeMetadata(records, fieldsToIndex,
getFileNameFromPath(filePath), writerSchemaOpt.get());
return new ArrayList<>(columnRangeMetadataMap.values());
}
return Collections.emptyList();
@@ -2138,12 +2140,12 @@ public class HoodieTableMetadataUtil {
LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndex);
// Create records for MDT
int parallelism = Math.max(Math.min(partitionInfoList.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
- Option<Schema> writerSchema = lazyWriterSchemaOpt.get();
return engineContext.parallelize(partitionInfoList,
parallelism).flatMap(partitionInfo -> {
final String partitionPath = partitionInfo.getRelativePath();
// Step 1: Collect Column Metadata for Each File
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionInfo.getFileNameToSizeMap().keySet().stream()
- .map(fileName -> getFileStatsRangeMetadata(partitionPath,
partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false,
true, writerSchemaOpt))
+ .map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName,
dataTableMetaClient, columnsToIndex, false,
+ metadataConfig.getMaxReaderBufferSize()))
.collect(Collectors.toList());
return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionPath, true).iterator();
@@ -2151,20 +2153,17 @@ public class HoodieTableMetadataUtil {
}
private static List<HoodieColumnRangeMetadata<Comparable>>
getFileStatsRangeMetadata(String partitionPath,
-
String filePath,
+
String fileName,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean isDeleted,
-
boolean shouldReadColumnMetadataForLogFiles,
-
Option<Schema> writerSchemaOpt) {
- String filePartitionPath = filePath.startsWith("/") ?
filePath.substring(1) : filePath;
- String fileName = FSUtils.getFileName(filePath, partitionPath);
+
int maxBufferSize) {
if (isDeleted) {
return columnsToIndex.stream()
.map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
.collect(Collectors.toList());
}
- return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, shouldReadColumnMetadataForLogFiles, writerSchemaOpt);
+ return readColumnRangeMetadataFrom(partitionPath, fileName,
datasetMetaClient, columnsToIndex, maxBufferSize);
}
public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,
@@ -2296,7 +2295,8 @@ public class HoodieTableMetadataUtil {
return columnRangeMap.values().stream().collect(Collectors.toList());
}
- return getFileStatsRangeMetadata(writeStat.getPartitionPath(),
writeStat.getPath(), datasetMetaClient, columnsToIndex, false, false,
writerSchemaOpt);
+ String filePath = writeStat.getPath();
+ return getFileStatsRangeMetadata(writeStat.getPartitionPath(),
getFileNameFromPath(filePath), datasetMetaClient, columnsToIndex, false, -1);
}
public static String getPartitionStatsIndexKey(String partitionPath, String
columnName) {
@@ -2434,13 +2434,22 @@ public class HoodieTableMetadataUtil {
private boolean isHoodiePartition = false;
public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime, Set<String> pendingDataInstants) {
+ this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true);
+ }
+
+ /**
+ * When files are directly fetched from Metadata table we do not need to
validate HoodiePartitions.
+ */
+ public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime, Set<String> pendingDataInstants,
+ boolean validateHoodiePartitions) {
this.relativePath = relativePath;
// Pre-allocate with the maximum length possible
filenameToSizeMap = new HashMap<>(pathInfos.size());
// Presence of partition meta file implies this is a HUDI partition
- isHoodiePartition = pathInfos.stream().anyMatch(status ->
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
+ // if input files are directly fetched from MDT, it may not contain the
HoodiePartitionMetadata file. So, we can ignore the validation for
isHoodiePartition.
+ isHoodiePartition = !validateHoodiePartitions ||
pathInfos.stream().anyMatch(status ->
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
for (StoragePathInfo pathInfo : pathInfos) {
// Do not attempt to search for more subdirectories inside directories
that are partitions
if (!isHoodiePartition && pathInfo.isDirectory()) {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 7621aca7c7f..ad5b1230bde 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -256,6 +256,10 @@ public class TestFSUtils extends HoodieCommonTestHarness {
assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(rlPath));
assertEquals(0, FSUtils.getStageIdFromLogPath(rlPath));
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath));
+
+ assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/path/" + logFile));
+ assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/abc/def/path/" +
logFile));
+ assertEquals(logFile, FSUtils.getFileNameFromPath("/tmp/" + logFile));
}
@Test
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
index 7fcc0d16193..ce2cae78342 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
@@ -29,18 +29,23 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests {@link HoodieMetadataPayload}.
*/
public class TestHoodieMetadataPayload extends HoodieCommonTestHarness {
public static final String PARTITION_NAME = "2022/10/01";
+ public static final String PARTITION_NAME2 = "2023/10/01";
+ public static final String PARTITION_NAME3 = "2024/10/01";
@Test
public void testFileSystemMetadataPayloadMerging() {
@@ -148,6 +153,27 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
).getData(),
deletionRecord2.getData().preCombine(deletionRecord1.getData().preCombine(additionRecord.getData()))
);
+
+ // lets delete all files
+ List<String> allDeletedFileList = new ArrayList<>();
+ allDeletedFileList.add("file1.parquet");
+ allDeletedFileList.add("file2.parquet");
+ allDeletedFileList.add("file3.parquet");
+ allDeletedFileList.add("file4.parquet");
+ HoodieRecord<HoodieMetadataPayload> allDeletionRecord =
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
Collections.emptyMap(), allDeletedFileList);
+
+ HoodieMetadataPayload combinedPayload =
allDeletionRecord.getData().preCombine(additionRecord.getData());
+
assertEquals(HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
Collections.emptyMap(), Collections.emptyList()).getData(), combinedPayload);
+ assertTrue(combinedPayload.filesystemMetadata.isEmpty());
+
+ // test all partition record
+ HoodieRecord<HoodieMetadataPayload> allPartitionsRecord =
HoodieMetadataPayload.createPartitionListRecord(Arrays.asList(PARTITION_NAME,
PARTITION_NAME2, PARTITION_NAME3), false);
+ HoodieRecord<HoodieMetadataPayload> partitionDeletedRecord =
HoodieMetadataPayload.createPartitionListRecord(Collections.singletonList(PARTITION_NAME),
true);
+ // combine to ensure the deleted partitions is not seen
+ HoodieMetadataPayload payload =
partitionDeletedRecord.getData().preCombine(allPartitionsRecord.getData());
+
assertEquals(HoodieMetadataPayload.createPartitionListRecord(Arrays.asList(PARTITION_NAME2,
PARTITION_NAME3), false).getData(),
+ payload);
}
@Test
@@ -211,6 +237,8 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
deletedColumnStatsRecord.getData().preCombine(columnStatsRecord.getData());
assertEquals(deletedColumnStatsRecord.getData(),
deletedCombinedMetadataPayload);
+
assertFalse(deletedCombinedMetadataPayload.getInsertValue(null).isPresent());
+ assertTrue(deletedCombinedMetadataPayload.isDeleted());
// NOTE: In this case, proper incoming record will be overwriting
previously deleted
// record
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 4ccc48b519d..9586171d97a 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -26,8 +26,10 @@ import
org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.FileCreateUtils;
@@ -206,7 +208,12 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
public void testGetLogFileColumnRangeMetadata() throws Exception {
HoodieLocalEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getStorageConf());
String instant1 = "20230918120000000";
- hoodieTestTable = hoodieTestTable.addCommit(instant1);
+
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.addMetadata("test", "test");
+ commitMetadata.setOperationType(WriteOperationType.INSERT);
+ commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS.toString());
+ hoodieTestTable = hoodieTestTable.addCommit(instant1,
Option.of(commitMetadata));
String instant2 = "20230918121110000";
hoodieTestTable = hoodieTestTable.addCommit(instant2);
List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new
ArrayList<>();
@@ -243,7 +250,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
storagePath2.toString(),
metaClient,
columnsToIndex,
-
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+ HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue());
// there must be two ranges for rider and driver
assertEquals(2, columnRangeMetadataLogFile.size());
} catch (Exception e) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
new file mode 100644
index 00000000000..464ad5ddca1
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
+
+/**
+ * Util methods used in tests to fetch col stats records for a log file.
+ */
+public class LogFileColStatsTestUtil {
+
+ public static Option<Row> getLogFileColumnRangeMetadata(String filePath,
HoodieTableMetaClient datasetMetaClient, String latestCommitTime,
+ List<String> columnsToIndex,
Option<Schema> writerSchemaOpt,
+ int maxBufferSize) throws
IOException {
+ if (writerSchemaOpt.isPresent()) {
+ List<Schema.Field> fieldsToIndex =
writerSchemaOpt.get().getFields().stream()
+ .filter(field -> columnsToIndex.contains(field.name()))
+ .collect(Collectors.toList());
+ List<HoodieRecord> records = new ArrayList<>();
+ HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(datasetMetaClient.getStorage())
+ .withBasePath(datasetMetaClient.getBasePath())
+ .withLogFilePaths(Collections.singletonList(filePath))
+ .withBufferSize(maxBufferSize)
+ .withLatestInstantTime(latestCommitTime)
+ .withReaderSchema(writerSchemaOpt.get())
+ .withLogRecordScannerCallback(records::add)
+ .build();
+ scanner.scan();
+ if (records.isEmpty()) {
+ return Option.empty();
+ }
+ Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
+ collectColumnRangeMetadata(records, fieldsToIndex, filePath,
writerSchemaOpt.get());
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
new ArrayList<>(columnRangeMetadataMap.values());
+ return Option.of(getColStatsEntry(filePath, columnRangeMetadataList));
+ } else {
+ throw new HoodieException("Writer schema needs to be set");
+ }
+ }
+
+ private static Row getColStatsEntry(String logFilePath,
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList) {
+ Collections.sort(columnRangeMetadataList, (o1, o2) ->
o1.getColumnName().compareTo(o2.getColumnName()));
+ Object[] values = new Object[(columnRangeMetadataList.size() * 3) + 2];
+ values[0] = logFilePath.substring(logFilePath.lastIndexOf("/") + 1);
+ values[1] = columnRangeMetadataList.get(0).getValueCount();
+ int counter = 2;
+ for (HoodieColumnRangeMetadata columnRangeMetadata:
columnRangeMetadataList) {
+ values[counter++] = columnRangeMetadata.getValueCount();
+ values[counter++] = columnRangeMetadata.getMinValue();
+ values[counter++] = columnRangeMetadata.getMaxValue();
+ }
+ return new GenericRow(values);
+ }
+
+ public static Option<Schema> getSchemaForTable(HoodieTableMetaClient
metaClient) throws Exception {
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+ return Option.of(schemaResolver.getTableAvroSchema());
+ }
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json
new file mode 100644
index 00000000000..83790766db2
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
984sdh","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minV
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json
new file mode 100644
index 00000000000..75aa7ada3ad
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap1-column-stats-index-table.json
@@ -0,0 +1,4 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minV
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_mi
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json
new file mode 100644
index 00000000000..9c52707a27d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-bootstrap2-column-stats-index-table.json
@@ -0,0 +1,5 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minV
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_mi
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_m
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json
new file mode 100644
index 00000000000..a08dea39c05
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/cow-clean1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_m
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..17e8f877c50
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/delete-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1 @@
+{"c1":633,"c2":"
987sdk","c3":375.308,"c4":"2021-11-18T23:34:44.180-08:00","c5":0,"c6":"2020-01-01","c7":"NA==","c8":9}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json
new file mode 100644
index 00000000000..dcbf49b141f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
984sdh","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json
new file mode 100644
index 00000000000..146097347e0
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap1-column-stats-index-table.json
@@ -0,0 +1,3 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
984sdh","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
[...]
+{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json
new file mode 100644
index 00000000000..6256be16c1d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-bootstrap2-column-stats-index-table.json
@@ -0,0 +1,5 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
984sdh","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
[...]
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
984sdh","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qQ==","c7_nullCount":0,"c8_maxValue":9,"c8
[...]
+{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":200000.000,"c3_minValue":0.100,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_m
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json
new file mode 100644
index 00000000000..8c7b1125314
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-clean1-column-stats-index-table.json
@@ -0,0 +1,2 @@
+{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
984sdh","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":10000.768,"c3_minValue":0.001,"c3_nullCount":0,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-19T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":80,"c5_minValue":-100,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_min
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json
new file mode 100644
index 00000000000..fc6c936c787
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/mor-delete-block1-column-stats-index-table.json
@@ -0,0 +1,3 @@
+{"c1_nullCount":0,"c2_nullCount":0,"c3_nullCount":0,"c4_nullCount":0,"c5_nullCount":0,"c6_nullCount":0,"c7_nullCount":0,"c8_nullCount":0,"valueCount":0}
+{"c1_maxValue":639,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":"
989sda","c2_minValue":"
980sdd","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":0.300,"c3_nullCount":1,"c4_maxValue":"2021-11-19T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":1000,"c5_minValue":-1000,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"aQ==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8
[...]
+{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":"
959sdc","c2_minValue":"
0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue
[...]
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..35ae749ddc3
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update2-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1,10 @@
+{"c1":323,"c2":"
980sdd","c3":null,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Ag==","c8":9}
+{"c1":326,"c2":"
981sde","c3":64.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AA==","c8":9}
+{"c1":555,"c2":"
982sdf","c3":153.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rw==","c8":9}
+{"c1":556,"c2":"
983sdg","c3":246.427,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qw==","c8":9}
+{"c1":562,"c2":"
984sdh","c3":977.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SA==","c8":9}
+{"c1":619,"c2":"
985sdi","c3":230.320,"c4":"2021-11-19T23:34:44.180-08:00","c5":1000,"c6":"2020-02-13","c7":"QA==","c8":9}
+{"c1":624,"c2":"
986sdj","c3":580.317,"c4":"2021-11-18T23:34:44.180-08:00","c5":-1,"c6":"2020-10-10","c7":"PQ==","c8":9}
+{"c1":633,"c2":"
987sdk","c3":375.308,"c4":"2021-11-18T23:34:44.180-08:00","c5":-1000,"c6":"2020-01-01","c7":"NA==","c8":9}
+{"c1":638,"c2":"
988sdl","c3":904.304,"c4":"2021-11-18T23:34:44.179-08:00","c5":20,"c6":"2020-08-25","c7":"MA==","c8":9}
+{"c1":639,"c2":"
989sda","c3":0.300,"c4":"2021-11-18T23:34:44.179-08:00","c5":90,"c6":"2020-04-21","c7":"aa==","c8":9}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..5e04406cf21
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update3-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1,5 @@
+{"c1":323,"c2":"
980sdd","c3":10.00,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Ag==","c8":9}
+{"c1":326,"c2":"
981sde","c3":10000.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AA==","c8":9}
+{"c1":555,"c2":"
982sdf","c3":2.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rw==","c8":9}
+{"c1":556,"c2":"
983sdg","c3":0.001,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qw==","c8":9}
+{"c1":562,"c2":"
984sdh","c3":5.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SA==","c8":9}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
new file mode 100644
index 00000000000..a83a82d8b8b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/update4-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json
@@ -0,0 +1,5 @@
+{"c1":323,"c2":"
980sdd","c3":200000.00,"c4":"2021-11-19T23:34:44.201-08:00","c5":70,"c6":"2020-01-15","c7":"Aj==","c8":9}
+{"c1":326,"c2":"
981sde","c3":100.768,"c4":"2021-11-19T23:34:44.201-08:00","c5":80,"c6":"2020-10-13","c7":"AB==","c8":9}
+{"c1":555,"c2":"
982sdf","c3":20.431,"c4":"2021-11-19T23:34:44.186-08:00","c5":10,"c6":"2020-03-12","c7":"rx==","c8":9}
+{"c1":556,"c2":"
983sdg","c3":0.1,"c4":"2021-11-19T23:34:44.186-08:00","c5":45,"c6":"2020-10-08","c7":"qf==","c8":9}
+{"c1":562,"c2":"
984sdh","c3":4.328,"c4":"2021-11-19T23:34:44.181-08:00","c5":-100,"c6":"2020-10-21","c7":"SL==","c8":9}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index 779abafb2da..8839f6d55d2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -18,26 +18,37 @@
package org.apache.hudi.functional
+
+import org.apache.avro.Schema
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig}
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.{HoodieBaseFile, HoodieFileGroup,
HoodieLogFile, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.FileSystemViewManager
+import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
-
import org.apache.spark.sql._
+import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
+import org.apache.hudi.testutils.{HoodieSparkClientTestBase,
LogFileColStatsTestUtil}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
+import org.apache.spark.sql.DataFrame
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api._
import org.junit.jupiter.params.provider.Arguments
import java.math.BigInteger
import java.sql.{Date, Timestamp}
-
+import java.util
+import java.util.List
+import java.util.stream.Collectors
import scala.collection.JavaConverters._
import scala.util.Random
@@ -76,42 +87,39 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
cleanupSparkContexts()
}
- protected def doWriteAndValidateColumnStats(testCase: ColumnStatsTestCase,
- metadataOpts: Map[String, String],
- hudiOpts: Map[String, String],
- dataSourcePath: String,
- expectedColStatsSourcePath: String,
- operation: String,
- saveMode: SaveMode,
- shouldValidate: Boolean = true):
Unit = {
- val sourceJSONTablePath =
getClass.getClassLoader.getResource(dataSourcePath).toString
+ protected def doWriteAndValidateColumnStats(params: ColumnStatsTestParams):
Unit = {
+
+ val sourceJSONTablePath =
getClass.getClassLoader.getResource(params.dataSourcePath).toString
// NOTE: Schema here is provided for validation that the input date is in
the appropriate format
val inputDF =
spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
+ val writeOptions: Map[String, String] = params.hudiOpts ++
params.metadataOpts
+
inputDF
.sort("c1")
- .repartition(4, new Column("c1"))
+ .repartition(params.numPartitions, new Column("c1"))
.write
.format("hudi")
- .options(hudiOpts)
- .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
- .option(DataSourceWriteOptions.OPERATION.key, operation)
- .mode(saveMode)
+ .options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION.key, params.operation)
+ .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(),
String.valueOf(params.parquetMaxFileSize))
+ .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(),
String.valueOf(params.smallFileLimit))
+ .mode(params.saveMode)
.save(basePath)
dfList = dfList :+ inputDF
metaClient = HoodieTableMetaClient.reload(metaClient)
- if (shouldValidate) {
+ if (params.shouldValidate) {
// Currently, routine manually validating the column stats (by actually
reading every column of every file)
// only supports parquet files. Therefore we skip such validation when
delta-log files are present, and only
// validate in following cases: (1) COW: all operations; (2) MOR: insert
only.
- val shouldValidateColumnStatsManually = testCase.tableType ==
HoodieTableType.COPY_ON_WRITE ||
- operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ val shouldValidateColumnStatsManually = params.testCase.tableType ==
HoodieTableType.COPY_ON_WRITE ||
+
params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
validateColumnStatsIndex(
- testCase, metadataOpts, expectedColStatsSourcePath,
shouldValidateColumnStatsManually)
+ params.testCase, params.metadataOpts,
params.expectedColStatsSourcePath, shouldValidateColumnStatsManually,
params.latestCompletedCommit)
}
}
@@ -119,16 +127,19 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
includedCols: Seq[String],
indexedCols: Seq[String],
indexSchema: StructType):
DataFrame = {
- val files = {
- val pathInfoList = storage.listFiles(new StoragePath(tablePath))
- pathInfoList.asScala.filter(fs =>
fs.getPath.getName.endsWith(".parquet"))
- }
-
- spark.createDataFrame(
- files.flatMap(file => {
- val df =
spark.read.schema(sourceTableSchema).parquet(file.getPath.toString)
+ val metaClient = HoodieTableMetaClient.builder().setConf(new
HadoopStorageConfiguration(jsc.hadoopConfiguration())).setBasePath(tablePath).build()
+ val fsv = FileSystemViewManager.createInMemoryFileSystemView(new
HoodieSparkEngineContext(jsc), metaClient,
HoodieMetadataConfig.newBuilder().enable(false).build())
+ fsv.loadAllPartitions()
+ val filegroupList =
fsv.getAllFileGroups.collect(Collectors.toList[HoodieFileGroup])
+ val baseFilesList = filegroupList.stream().flatMap(fileGroup =>
fileGroup.getAllBaseFiles).collect(Collectors.toList[HoodieBaseFile])
+ val baseFiles = baseFilesList.stream()
+ .map[StoragePath](baseFile =>
baseFile.getStoragePath).collect(Collectors.toList[StoragePath]).asScala
+
+ val baseFilesDf = spark.createDataFrame(
+ baseFiles.flatMap(file => {
+ val df = spark.read.schema(sourceTableSchema).parquet(file.toString)
val exprs: Seq[String] =
- s"'${typedLit(file.getPath.getName)}' AS file" +:
+ s"'${typedLit(file.getName)}' AS file" +:
s"sum(1) AS valueCount" +:
df.columns
.filter(col => includedCols.contains(col))
@@ -156,12 +167,61 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
}).asJava,
indexSchema
)
+
+ if (metaClient.getTableConfig.getTableType ==
HoodieTableType.COPY_ON_WRITE) {
+ baseFilesDf // COW table
+ } else {
+ val allLogFiles = filegroupList.stream().flatMap(fileGroup =>
fileGroup.getAllFileSlices)
+ .flatMap(fileSlice => fileSlice.getLogFiles)
+ .collect(Collectors.toList[HoodieLogFile])
+ if (allLogFiles.isEmpty) {
+ baseFilesDf // MOR table, but no log files.
+ } else {
+ val colsToGenerateStats = indexedCols // check for included cols
+ val writerSchemaOpt =
LogFileColStatsTestUtil.getSchemaForTable(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ baseFilesDf.union(getColStatsFromLogFiles(allLogFiles,
latestCompletedCommit,
+ scala.collection.JavaConverters.seqAsJavaList(colsToGenerateStats),
+ metaClient,
+ writerSchemaOpt: org.apache.hudi.common.util.Option[Schema],
+ HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue(),
+ indexSchema))
+ }
+ }
+ }
+
+ protected def getColStatsFromLogFiles(logFiles: List[HoodieLogFile],
latestCommit: String, columnsToIndex: util.List[String],
+ datasetMetaClient:
HoodieTableMetaClient,
+ writerSchemaOpt:
org.apache.hudi.common.util.Option[Schema],
+ maxBufferSize: Integer,
+ indexSchema: StructType): DataFrame = {
+ val colStatsEntries =
logFiles.stream().map[org.apache.hudi.common.util.Option[Row]](logFile => {
+ try {
+ getColStatsFromLogFile(logFile.getPath.toString, latestCommit,
columnsToIndex, datasetMetaClient, writerSchemaOpt, maxBufferSize)
+ } catch {
+ case e: Exception =>
+ throw e
+ }
+ }).filter(rowOpt => rowOpt.isPresent).map[Row](rowOpt =>
rowOpt.get()).collect(Collectors.toList[Row])
+ spark.createDataFrame(colStatsEntries, indexSchema)
+ }
+
+ protected def getColStatsFromLogFile(logFilePath: String,
+ latestCommit: String,
+ columnsToIndex: util.List[String],
+ datasetMetaClient:
HoodieTableMetaClient,
+ writerSchemaOpt:
org.apache.hudi.common.util.Option[Schema],
+ maxBufferSize: Integer
+ ):
org.apache.hudi.common.util.Option[Row] = {
+ LogFileColStatsTestUtil.getLogFileColumnRangeMetadata(logFilePath,
datasetMetaClient, latestCommit,
+ columnsToIndex, writerSchemaOpt, maxBufferSize)
}
protected def validateColumnStatsIndex(testCase: ColumnStatsTestCase,
- metadataOpts: Map[String, String],
- expectedColStatsSourcePath: String,
- validateColumnStatsManually: Boolean):
Unit = {
+ metadataOpts: Map[String, String],
+ expectedColStatsSourcePath: String,
+ validateColumnStatsManually: Boolean,
+ latestCompletedCommit: String): Unit
= {
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
.build()
@@ -177,7 +237,8 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
}
}
val (expectedColStatsSchema, _) =
composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns,
sourceTableSchema)
- val validationSortColumns = Seq("c1_maxValue", "c1_minValue",
"c2_maxValue", "c2_minValue")
+ val validationSortColumns = Seq("c1_maxValue", "c1_minValue",
"c2_maxValue", "c2_minValue", "c3_maxValue",
+ "c3_minValue", "c5_maxValue", "c5_minValue")
columnStatsIndex.loadTransposed(sourceTableSchema.fieldNames,
testCase.shouldReadInMemory) { transposedColStatsDF =>
// Match against expected column stats table
@@ -269,14 +330,41 @@ object ColumnStatIndexTestBase {
def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
{
java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType
=>
Seq(Arguments.arguments(ColumnStatsTestCase(tableType,
shouldReadInMemory = true)),
- Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= false)))
+ Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= false))
+ )
): _*)
}
def testMetadataColumnStatsIndexParamsForMOR:
java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = true)),
- Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = false)))
- : _*)
+ Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = false))
+ )
+ : _*)
}
+
+ def testTableTypePartitionTypeParams: java.util.stream.Stream[Arguments] = {
+ java.util.stream.Stream.of(
+ Seq(
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8"),
+ // empty partition col represents non-partitioned table.
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, ""),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8"),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "")
+ )
+ : _*)
+ }
+
+ case class ColumnStatsTestParams(testCase: ColumnStatsTestCase,
+ metadataOpts: Map[String, String],
+ hudiOpts: Map[String, String],
+ dataSourcePath: String,
+ expectedColStatsSourcePath: String,
+ operation: String,
+ saveMode: SaveMode,
+ shouldValidate: Boolean = true,
+ latestCompletedCommit: String = null,
+ numPartitions: Integer = 4,
+ parquetMaxFileSize: Integer = 10 * 1024,
+ smallFileLimit: Integer = 100 * 1024 * 1024)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index d4190957913..9323b9ce2ed 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -18,6 +18,8 @@
package org.apache.hudi.functional
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD,
PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -26,14 +28,21 @@ import org.apache.hudi.common.model.{HoodieBaseFile,
HoodieFileGroup, HoodieTabl
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.common.util.ParquetUtils
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieWriteConfig}
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.view.FileSystemViewManager
+import org.apache.hudi.common.util.{ParquetUtils, StringUtils}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
+import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions,
config}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
GreaterThan, Literal, Or}
@@ -70,17 +79,17 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/input-table-json",
expectedColStatsSourcePath =
"index/colstats/column-stats-index-table.json",
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Overwrite)
+ saveMode = SaveMode.Overwrite))
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/another-input-table-json",
expectedColStatsSourcePath =
"index/colstats/updated-column-stats-index-table.json",
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
// NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
// deferred updates), diverging from COW
@@ -90,13 +99,441 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // lets validate that we have log files generated in case of MOR table
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val metaClient = HoodieTableMetaClient.builder().setConf(new
HadoopStorageConfiguration(jsc.hadoopConfiguration())).setBasePath(basePath).build()
+ val fsv = FileSystemViewManager.createInMemoryFileSystemView(new
HoodieSparkEngineContext(jsc), metaClient,
HoodieMetadataConfig.newBuilder().enable(false).build())
+ fsv.loadAllPartitions()
+ val baseStoragePath = new StoragePath(basePath)
+ val allPartitionPaths = fsv.getPartitionPaths
+ allPartitionPaths.forEach(partitionPath => {
+ val pPath = FSUtils.getRelativePartitionPath(baseStoragePath,
partitionPath)
+ assertTrue (fsv.getLatestFileSlices(pPath).filter(fileSlice =>
fileSlice.hasLogFiles).count() > 0)
+ })
+ fsv.close()
+ }
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // trigger one more upsert and compaction (w/ MOR table) and validate.
+ val expectedColStatsSourcePath1 = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap2-column-stats-index-table.json"
+ }
+
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update4-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath1,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ "hoodie.write.markers.type" -> "DIRECT",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ // inserts
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ simulateFailureForLatestCommit(tableType, partitionCol)
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-bootstrap-rollback1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ // updates a subset which are not deleted and enable col stats and
validate bootstrap
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ latestCompletedCommit,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
+ }
+
+ def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol:
String) : Unit = {
+ // simulate failure for latest commit.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var baseFileName : String = null
+ var logFileName : String = null
+ val lastCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath, "/"))
+ } else {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath, "9"))
+ }
+ val logFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(".log")).findFirst().get()
+ logFileName = logFileFileStatus.getPath.getName
+ } else {
+ val dataFiles = if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath.toString))
+ } else {
+ metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath, "9"))
+ }
+ val baseFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(lastCompletedCommit.getTimestamp)).findFirst().get()
+ baseFileName = baseFileFileStatus.getPath.getName
+ }
+
+ val latestCompletedFileName = lastCompletedCommit.getFileName
+ metaClient.getStorage.deleteFile(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/" +
latestCompletedFileName))
+
+ // re-create marker for the deleted file.
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/" + logFileName + ".marker.APPEND"))
+ } else {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/9/" + logFileName + ".marker.APPEND"))
+ }
+ } else {
+ if (StringUtils.isNullOrEmpty(partitionCol)) {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/" + baseFileName + ".marker.MERGE"))
+ } else {
+ metaClient.getStorage.create(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/.temp/" +
lastCompletedCommit.getTimestamp + "/9/" + baseFileName + ".marker.MERGE"))
+ }
+ }
+ }
+
+ @Test
+ def testMORDeleteBlocks(): Unit = {
+ val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
+ val partitionCol = "c8"
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ ) ++ metadataOpts
+
+ // inserts
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ // updates
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val expectedColStatsSourcePath =
"index/colstats/mor-delete-block1-column-stats-index-table.json"
+
+ // delete a subset of recs. this will add a delete log block for MOR table.
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/delete-input-table-json/",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("", "c8"))
+ def testColStatsWithCleanCOW(partitionCol: String): Unit = {
+ val tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1"
+ ) ++ metadataOpts
+
+ // inserts
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // updates 1
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-clean1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
+ }
+
+ // updates 2
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json/",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("", "c8"))
+ def testColStatsWithCleanMOR(partitionCol: String): Unit = {
+ val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> partitionCol,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1",
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2"
+ ) ++ metadataOpts
+
+ // inserts
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val metadataOpts1 = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ // updates 1
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update2-input-table-json/",
+ expectedColStatsSourcePath = null,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ false,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-clean1-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-clean1-column-stats-index-table.json"
+ }
+
+ // updates 2
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts1, commonOpts,
+ dataSourcePath = "index/colstats/update3-input-table-json/",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ true,
+ numPartitions = 1,
+ parquetMaxFileSize = 100 * 1024 * 1024,
+ smallFileLimit = 0))
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants()
> 0)
+ }
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 1c731a6d0a6..effda5af016 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata,
HoodieTableType, Writ
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.timeline.{HoodieInstant,
MetadataConversionUtils}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
-import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
+import
org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase,
ColumnStatsTestParams}
import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.util.JavaConversions
@@ -88,12 +88,12 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
HoodieIndexConfig.INDEX_TYPE.key() -> INMEMORY.name()
) ++ metadataOpts
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/input-table-json",
expectedColStatsSourcePath =
"index/colstats/column-stats-index-table.json",
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite,
- shouldValidate = false)
+ shouldValidate = false))
assertEquals(4, getLatestDataFilesCount(commonOpts))
assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles =
false))
@@ -133,12 +133,12 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
verifyFileIndexAndSQLQueries(commonOpts,
isTableDataSameAsAfterSecondInstant = true)
// Add the last df back and verify the queries
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = "",
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- shouldValidate = false)
+ shouldValidate = false))
verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false)
}
@@ -195,27 +195,27 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty())
writeClient.close()
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = "",
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- shouldValidate = false)
+ shouldValidate = false))
verifyFileIndexAndSQLQueries(commonOpts)
}
private def setupTable(testCase: ColumnStatsTestCase, metadataOpts:
Map[String, String], commonOpts: Map[String, String], shouldValidate: Boolean):
Unit = {
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/input-table-json",
expectedColStatsSourcePath =
"index/colstats/column-stats-index-table.json",
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Overwrite)
+ saveMode = SaveMode.Overwrite))
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/another-input-table-json",
expectedColStatsSourcePath =
"index/colstats/updated-column-stats-index-table.json",
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
// NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
// deferred updates), diverging from COW
@@ -225,12 +225,12 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- shouldValidate)
+ shouldValidate))
}
def verifyFileIndexAndSQLQueries(opts: Map[String, String],
isTableDataSameAsAfterSecondInstant: Boolean = false, verifyFileCount: Boolean
= true): Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index c04dc09a69c..216d2b71d32 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -319,7 +319,6 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
spark.sql(s"update $tableName set not_record_key_col = 'xyz' where
record_key_col = 'row1'")
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey,
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
- Seq("abc", "row1", true),
Seq("cde", "row2", false),
Seq("def", "row3", false),
Seq("xyz", "row1", false)
@@ -328,7 +327,7 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where record_key_col = 'row1'")(
Seq(1, "row1", "xyz", "p1")
)
- verifyQueryPredicate(hudiOpts, "not_record_key_col")
+ verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
}
}
@@ -516,8 +515,6 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
|FROM hudi_metadata('$basePath')
|WHERE type=7
""".stripMargin)(
- Seq("abc", "row1", true),
- Seq("cde", "row2", true),
Seq("value1_1", "row1", false),
Seq("value2_2", "row2", false)
)
@@ -605,7 +602,6 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
spark.sql(s"update $tableName set not_record_key_col = 'xyz' where
record_key_col = 'row1'")
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey,
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
- Seq("abc", "row1", true),
Seq("cde", "row2", false),
Seq("def", "row3", false),
Seq("xyz", "row1", false)
@@ -614,7 +610,7 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where record_key_col = 'row1'")(
Seq(1, "row1", "xyz", "p1")
)
- verifyQueryPredicate(hudiOpts, "not_record_key_col")
+ verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
}
}
@@ -770,11 +766,11 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
// however index definition should still be present
assertTrue(metaClient.getIndexMetadata.isPresent &&
metaClient.getIndexMetadata.get.getIndexDefinitions.get(secondaryIndexPartition).getIndexType.equals("secondary_index"))
+
// update the secondary key column
spark.sql(s"update $tableName set not_record_key_col = 'xyz' where
record_key_col = 'row1'")
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey,
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
- Seq("abc", "row1", true),
Seq("xyz", "row1", false)
)
}
@@ -784,10 +780,11 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
assertResult(expects.map(row => Row(row:
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}
- private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName:
String): Unit = {
+ private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName:
String, nonExistantKey: String = "abcdefghi"): Unit = {
mergedDfList = mergedDfList :+
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
- val secondaryKey = mergedDfList.last.limit(1).collect().map(row =>
row.getAs(columnName).toString)
- val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey(0)))
+ val secondaryKey = mergedDfList.last.limit(2).collect().filter(row =>
!row.getAs(columnName).toString.equals(nonExistantKey))
+ .map(row => row.getAs(columnName).toString).head
+ val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey))
verifyFilePruning(hudiOpts, dataFilter)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 5a302304277..c4b76a6de88 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -634,8 +634,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
// verify there are new updates to functional index with isDeleted
true for cleaned file
checkAnswer(s"select ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value, ColumnStatsMetadata.isDeleted from
hudi_metadata('$tableName') where type=3 and
ColumnStatsMetadata.fileName='$fileName'")(
- Seq("2022-09-26", "2022-09-26", false),
- Seq(null, null, true) // for the cleaned file
+ Seq("2022-09-26", "2022-09-26", false) // for cleaned file,
there won't be any stats produced.
)
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 455fd4b38f2..2763077a887 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -217,7 +217,7 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
Dataset<Row> rows = getRowDataset(1, "row1", "abc", "p1");
rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
- rows = getRowDataset(2, "row2", "abc", "p2");
+ rows = getRowDataset(2, "row2", "ghi", "p2");
rows.write().format("hudi").mode(SaveMode.Append).save(basePath);
rows = getRowDataset(3, "row3", "def", "p2");
rows.write().format("hudi").mode(SaveMode.Append).save(basePath);