This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 940ec142389 [HUDI-6954] Fixing unpartitioned datasets for col stats
and bloom filter partition in MDT (#10251)
940ec142389 is described below
commit 940ec1423894985f247ea4a9a75a69871ad31264
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Dec 8 09:22:09 2023 -0800
[HUDI-6954] Fixing unpartitioned datasets for col stats and bloom filter
partition in MDT (#10251)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 9 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 2 +-
.../index/bloom/SparkHoodieBloomIndexHelper.java | 3 +-
.../functional/TestHoodieBackedMetadata.java | 23 +++-
.../apache/hudi/metadata/BaseTableMetadata.java | 4 +-
.../hudi/metadata/HoodieMetadataPayload.java | 17 +--
.../hudi/metadata/HoodieTableMetadataUtil.java | 38 ++++---
.../TestMetadataTableWithSparkDataSource.scala | 118 +++++++++++++++++++--
8 files changed, 175 insertions(+), 39 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 781a9024117..6d849ea5dbe 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -407,7 +407,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
.map(p -> {
- String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
return Pair.of(partitionName, p.getFileNameToSizeMap());
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -587,7 +587,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// FILES partition uses a single file group
final int fileGroupCount = 1;
- List<String> partitions = partitionInfoList.stream().map(p ->
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()))
+ List<String> partitions = partitionInfoList.stream().map(p ->
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()))
.collect(Collectors.toList());
final int totalDataFilesCount =
partitionInfoList.stream().mapToInt(DirectoryInfo::getTotalFiles).sum();
LOG.info("Committing total {} partitions and {} files to metadata",
partitions.size(), totalDataFilesCount);
@@ -603,8 +603,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating
records for metadata FILES partition");
HoodieData<HoodieRecord> fileListRecords =
engineContext.parallelize(partitionInfoList,
partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap =
partitionInfo.getFileNameToSizeMap();
- return HoodieMetadataPayload.createPartitionFilesRecord(
-
HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()),
fileNameToSizeMap, Collections.emptyList());
+ return
HoodieMetadataPayload.createPartitionFilesRecord(partitionInfo.getRelativePath(),
fileNameToSizeMap, Collections.emptyList());
});
ValidationUtils.checkState(fileListRecords.count() == partitions.size());
@@ -1446,7 +1445,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
} else {
partitionPath = new Path(dataWriteConfig.getBasePath(), partition);
}
- final String partitionId =
HoodieTableMetadataUtil.getPartitionIdentifier(partition);
+ final String partitionId =
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
FileStatus[] metadataFiles =
metadata.getAllFilesInPartition(partitionPath);
if (!dirInfoMap.containsKey(partition)) {
// Entire partition has been deleted
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index f0e55c8dc0d..e7b51e50bbb 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -1417,7 +1417,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
.forEach(partitionWriteStat -> {
String partitionStatName = partitionWriteStat.getKey();
List<HoodieWriteStat> writeStats =
partitionWriteStat.getValue();
- String partition =
HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName);
+ String partition =
HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionStatName);
if
(!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
commitToPartitionsToFiles.get(commitTime).put(partition, new
ArrayList<>());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index cd5f14d8fdf..0b3414a8a2d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -38,6 +38,7 @@ import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileStatus;
@@ -284,7 +285,7 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
}
String bloomIndexEncodedKey =
- getBloomFilterIndexKey(new PartitionIndexID(partitionPath), new
FileIndexID(baseFileName));
+ getBloomFilterIndexKey(new
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionPath)),
new FileIndexID(baseFileName));
// NOTE: It's crucial that [[targetPartitions]] be congruent w/ the
number of
// actual file-groups in the Bloom Index in MT
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index ef5e0714a2f..4c7c63ed81b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1840,7 +1840,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.forEach(partitionWriteStat -> {
String partitionStatName = partitionWriteStat.getKey();
List<HoodieWriteStat> writeStats =
partitionWriteStat.getValue();
- String partition =
HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName);
+ String partition =
HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionStatName);
if
(!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
commitToPartitionsToFiles.get(commitTime).put(partition, new
ArrayList<>());
}
@@ -2923,6 +2923,27 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
}
+ @Test
+ public void testNonPartitionedColStats() throws Exception {
+ init(HoodieTableType.COPY_ON_WRITE, false);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ HoodieTestDataGenerator nonPartitionedGenerator = new
HoodieTestDataGenerator(new String[] {""});
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build()).build();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ // Write 1 (Bulk insert)
+ String newCommitTime = "0000001";
+ List<HoodieRecord> records =
nonPartitionedGenerator.generateInserts(newCommitTime, 10);
+ client.startCommitWithTime(newCommitTime);
+ List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ validateMetadata(client);
+
+ List<String> metadataPartitions =
metadata(client).getAllPartitionPaths();
+ assertTrue(metadataPartitions.contains(""), "Must contain empty
partition");
+ }
+ }
+
/**
* Test various metrics published by metadata table.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 7e1acf3a87c..1b7c2db2daa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -197,7 +197,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
final String bloomFilterIndexKey =
HoodieMetadataPayload.getBloomFilterIndexKey(
- new PartitionIndexID(partitionNameFileNamePair.getLeft()), new
FileIndexID(partitionNameFileNamePair.getRight()));
+ new
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
new FileIndexID(partitionNameFileNamePair.getRight()));
partitionIDFileIDStrings.add(bloomFilterIndexKey);
fileToKeyMap.put(bloomFilterIndexKey, partitionNameFileNamePair);
});
@@ -245,7 +245,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
for (Pair<String, String> partitionNameFileNamePair :
partitionNameFileNameList) {
final String columnStatsIndexKey =
HoodieMetadataPayload.getColumnStatsIndexKey(
- new PartitionIndexID(partitionNameFileNamePair.getLeft()),
+ new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
new FileIndexID(partitionNameFileNamePair.getRight()),
columnIndexID);
columnStatKeyset.add(columnStatsIndexKey);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 3627161559a..080aeaec6a5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -72,7 +72,6 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
import static
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier;
/**
* MetadataTable records are persisted with the schema defined in
HoodieMetadata.avsc.
@@ -310,7 +309,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions, boolean isDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition ->
fileInfo.put(getPartitionIdentifier(partition), new HoodieMetadataFileInfo(0L,
isDeleted)));
+ partitions.forEach(partition ->
fileInfo.put(HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition),
new HoodieMetadataFileInfo(0L, isDeleted)));
HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
@@ -328,13 +327,14 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
Map<String, Long> filesAdded,
List<String> filesDeleted) {
+ String partitionIdentifier =
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
int size = filesAdded.size() + filesDeleted.size();
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(size, 1);
filesAdded.forEach((fileName, fileSize) -> fileInfo.put(fileName, new
HoodieMetadataFileInfo(fileSize, false)));
filesDeleted.forEach(fileName -> fileInfo.put(fileName,
DELETE_FILE_METADATA));
- HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieKey key = new HoodieKey(partitionIdentifier,
MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieAvroRecord<>(key, payload);
}
@@ -358,8 +358,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
checkArgument(!baseFileName.contains(Path.SEPARATOR)
&& FSUtils.isBaseFile(new Path(baseFileName)),
"Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
- final String bloomFilterIndexKey = new
PartitionIndexID(partitionName).asBase64EncodedString()
- .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+ final String bloomFilterIndexKey = getBloomFilterRecordKey(partitionName,
baseFileName);
HoodieKey key = new HoodieKey(bloomFilterIndexKey,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
HoodieMetadataBloomFilter metadataBloomFilter =
@@ -408,6 +407,11 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
}
}
+ private static String getBloomFilterRecordKey(String partitionName, String
fileName) {
+ return new
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionName)).asBase64EncodedString()
+ .concat(new FileIndexID(fileName).asBase64EncodedString());
+ }
+
private HoodieMetadataBloomFilter
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
// Bloom filters are always additive. No need to merge with previous bloom
filter
return this.bloomFilterMetadata;
@@ -606,7 +610,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* @return Column stats index key
*/
public static String getColumnStatsIndexKey(String partitionName,
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata) {
- final PartitionIndexID partitionIndexID = new
PartitionIndexID(partitionName);
+
+ final PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName));
final FileIndexID fileIndexID = new FileIndexID(new
Path(columnRangeMetadata.getFilePath()).getName());
final ColumnIndexID columnIndexID = new
ColumnIndexID(columnRangeMetadata.getColumnName());
return getColumnStatsIndexKey(partitionIndexID, fileIndexID,
columnIndexID);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 2576f38c127..839a7ed41a3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -400,8 +400,6 @@ public class HoodieTableMetadataUtil {
String partitionStatName = entry.getKey();
List<HoodieWriteStat> writeStats = entry.getValue();
- String partition = getPartitionIdentifier(partitionStatName);
-
HashMap<String, Long> updatedFilesToSizesMapping =
writeStats.stream().reduce(new HashMap<>(writeStats.size()),
(map, stat) -> {
@@ -431,7 +429,7 @@ public class HoodieTableMetadataUtil {
CollectionUtils::combine);
newFileCount.add(updatedFilesToSizesMapping.size());
- return
HoodieMetadataPayload.createPartitionFilesRecord(partition,
updatedFilesToSizesMapping,
+ return
HoodieMetadataPayload.createPartitionFilesRecord(partitionStatName,
updatedFilesToSizesMapping,
Collections.emptyList());
})
.collect(Collectors.toList());
@@ -447,7 +445,7 @@ public class HoodieTableMetadataUtil {
private static List<String> getPartitionsAdded(HoodieCommitMetadata
commitMetadata) {
return commitMetadata.getPartitionToWriteStats().keySet().stream()
// We need to make sure we properly handle case of non-partitioned
tables
- .map(HoodieTableMetadataUtil::getPartitionIdentifier)
+ .map(HoodieTableMetadataUtil::getPartitionIdentifierForFilesPartition)
.collect(Collectors.toList());
}
@@ -557,10 +555,9 @@ public class HoodieTableMetadataUtil {
int[] fileDeleteCount = {0};
List<String> deletedPartitions = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partitionName,
partitionMetadata) -> {
- final String partition = getPartitionIdentifier(partitionName);
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Collections.emptyMap(),
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
Collections.emptyMap(),
deletedFiles);
records.add(record);
fileDeleteCount[0] += deletedFiles.size();
@@ -712,7 +709,7 @@ public class HoodieTableMetadataUtil {
dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(),
HoodieRollbackPlan.class);
rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> {
- final String partitionId =
getPartitionIdentifier(rollbackRequest.getPartitionPath());
+ final String partitionId =
getPartitionIdentifierForFilesPartition(rollbackRequest.getPartitionPath());
partitionToFilesMap.computeIfAbsent(partitionId, s -> new HashMap<>());
// fetch only log files that are expected to be RB'd in DT as part of
this rollback. these log files will not be deleted, but rendered
// invalid once rollback is complete.
@@ -759,7 +756,7 @@ public class HoodieTableMetadataUtil {
// Has this rollback produced new files?
boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null &&
!pm.getRollbackLogFiles().isEmpty();
final String partition = pm.getPartitionPath();
- final String partitionId = getPartitionIdentifier(partition);
+ final String partitionId =
getPartitionIdentifierForFilesPartition(partition);
BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
// if a file exists in both written log files and rollback log files,
we want to pick the one that is higher
@@ -792,20 +789,19 @@ public class HoodieTableMetadataUtil {
partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
fileChangeCount[0] += deletedFiles.size();
- final String partition = getPartitionIdentifier(partitionName);
Map<String, Long> filesAdded = Collections.emptyMap();
if (partitionToAppendedFiles.containsKey(partitionName)) {
filesAdded = partitionToAppendedFiles.remove(partitionName);
}
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partitionName, filesAdded,
deletedFiles);
records.add(record);
});
partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
- final String partition = getPartitionIdentifier(partitionName);
+ final String partition =
getPartitionIdentifierForFilesPartition(partitionName);
fileChangeCount[1] += appendedFileMap.size();
// Validate that no appended file has been deleted
@@ -825,10 +821,22 @@ public class HoodieTableMetadataUtil {
return records;
}
+ public static String getColumnStatsIndexPartitionIdentifier(String
partitionName) {
+ return getPartitionIdentifier(partitionName);
+ }
+
+ public static String getBloomFilterIndexPartitionIdentifier(String
partitionName) {
+ return getPartitionIdentifier(partitionName);
+ }
+
+ public static String getPartitionIdentifierForFilesPartition(String
relativePartitionPath) {
+ return getPartitionIdentifier(relativePartitionPath);
+ }
+
/**
* Returns partition name for the given path.
*/
- public static String getPartitionIdentifier(@Nonnull String
relativePartitionPath) {
+ private static String getPartitionIdentifier(@Nonnull String
relativePartitionPath) {
return EMPTY_PARTITION_NAME.equals(relativePartitionPath) ?
NON_PARTITIONED_NAME : relativePartitionPath;
}
@@ -868,9 +876,8 @@ public class HoodieTableMetadataUtil {
}
}
- final String partition = getPartitionIdentifier(partitionName);
return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, filename, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer,
partitionFileFlagTuple.f2))
+ partitionName, filename, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer,
partitionFileFlagTuple.f2))
.iterator();
});
}
@@ -909,8 +916,7 @@ public class HoodieTableMetadataUtil {
}
final String filePathWithPartition = partitionName + "/" + filename;
- final String partitionId = getPartitionIdentifier(partitionName);
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
+ return getColumnStatsRecords(partitionName, filePathWithPartition,
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
});
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index aa40e8c5156..168176b75c8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -18,11 +18,17 @@
package org.apache.hudi.functional
+import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util.{ParquetUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.{BaseTableMetadata, HoodieBackedTableMetadata,
HoodieTableMetadata, MetadataPartitionType}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.spark.SparkConf
@@ -30,30 +36,34 @@ import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
+import java.util
+import java.util.Collections
import scala.collection.JavaConverters._
@Tag("functional")
class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarness {
val hudi = "org.apache.hudi"
- var commonOpts = Map(
+ var nonPartitionedCommonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
- DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
+ var partitionedCommonOpts = nonPartitionedCommonOpts ++ Map(
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition")
+
override def conf: SparkConf = conf(getSparkSqlConf)
@ParameterizedTest
- @ValueSource(ints = Array(1/*, 5*/)) // TODO: fix for higher
compactNumDeltaCommits - HUDI-6340
- def testReadability(compactNumDeltaCommits: Int): Unit = {
+ @CsvSource(Array("1,true", "1,false")) // TODO: fix for higher
compactNumDeltaCommits - HUDI-6340
+ def testReadability(compactNumDeltaCommits: Int, testPartitioned: Boolean):
Unit = {
val dataGen = new HoodieTestDataGenerator()
val metadataOpts: Map[String, String] = Map(
@@ -61,6 +71,12 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)
+ val commonOpts = if (testPartitioned) {
+ partitionedCommonOpts
+ } else {
+ nonPartitionedCommonOpts
+ }
+
val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++
Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key ->
compactNumDeltaCommits.toString)
@@ -84,16 +100,23 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
.mode(SaveMode.Append)
.save(basePath)
+ if (testPartitioned) {
+ validatePartitionedTable(basePath)
+ } else {
+ validateUnPartitionedTable(basePath)
+ }
+ }
+
+ private def validatePartitionedTable(basePath: String) : Unit = {
// Files partition of MT
val filesPartitionDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
-
// Smoke test
filesPartitionDF.show()
// Query w/ 0 requested columns should be working fine
assertEquals(4, filesPartitionDF.count())
- val expectedKeys = Seq("2015/03/16", "2015/03/17", "2016/03/15",
"__all_partitions__")
+ val expectedKeys = Seq("2015/03/16", "2015/03/17", "2016/03/15",
HoodieTableMetadata.RECORDKEY_PARTITION_LIST)
val keys = filesPartitionDF.select("key")
.collect()
.map(_.getString(0))
@@ -104,9 +127,90 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
// Column Stats Index partition of MT
val colStatsDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
+ // Smoke test
+ colStatsDF.show()
+
+ // lets pick one data file and validate col stats
+ val partitionPathToTest = "2015/03/16"
+ val engineContext = new HoodieSparkEngineContext(jsc())
+ val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build();
+ val baseTableMetada : HoodieTableMetadata = new
HoodieBackedTableMetadata(engineContext, metadataConfig, s"$basePath", false)
+
+ val fileStatuses = baseTableMetada.getAllFilesInPartition(new
Path(s"$basePath/" + partitionPathToTest))
+ val fileName = fileStatuses.apply(0).getPath.getName
+
+ val partitionFileNamePair :
java.util.List[org.apache.hudi.common.util.collection.Pair[String, String]] =
new util.ArrayList
+
partitionFileNamePair.add(org.apache.hudi.common.util.collection.Pair.of(partitionPathToTest,fileName))
+
+ val colStatsRecords =
baseTableMetada.getColumnStats(partitionFileNamePair, "begin_lat")
+ assertEquals(colStatsRecords.size(), 1)
+ val metadataColStats = colStatsRecords.get(partitionFileNamePair.get(0))
+
+ // read parquet file and verify stats
+ val colRangeMetadataList:
java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils()
+ .readRangeFromParquetMetadata(jsc().hadoopConfiguration(),
fileStatuses.apply(0).getPath, Collections.singletonList("begin_lat"))
+ val columnRangeMetadata = colRangeMetadataList.get(0)
+
+ assertEquals(metadataColStats.getValueCount,
columnRangeMetadata.getValueCount)
+ assertEquals(metadataColStats.getTotalSize,
columnRangeMetadata.getTotalSize)
+
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMaxValue),
columnRangeMetadata.getMaxValue)
+
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMinValue),
columnRangeMetadata.getMinValue)
+ assertEquals(metadataColStats.getFileName, fileName)
+ }
+
+ private def validateUnPartitionedTable(basePath: String) : Unit = {
+ // Files partition of MT
+ val filesPartitionDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
+ // Smoke test
+ filesPartitionDF.show()
+ // Query w/ 0 requested columns should be working fine
+ assertEquals(2, filesPartitionDF.count())
+
+ val expectedKeys = Seq(HoodieTableMetadata.NON_PARTITIONED_NAME,
HoodieTableMetadata.RECORDKEY_PARTITION_LIST)
+ val keys = filesPartitionDF.select("key")
+ .collect()
+ .map(_.getString(0))
+ .toSeq
+ .sorted
+
+ assertEquals(expectedKeys, keys)
+
+ // Column Stats Index partition of MT
+ val colStatsDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
// Smoke test
colStatsDF.show()
+
+ // lets pick one data file and validate col stats
+ val partitionPathToTest = ""
+ val engineContext = new HoodieSparkEngineContext(jsc())
+ val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build();
+ val baseTableMetada : HoodieTableMetadata = new
HoodieBackedTableMetadata(engineContext, metadataConfig, s"$basePath", false)
+
+ val allPartitionPaths = baseTableMetada.getAllPartitionPaths
+ assertEquals(allPartitionPaths.size(), 1)
+ assertEquals(allPartitionPaths.get(0),
HoodieTableMetadata.EMPTY_PARTITION_NAME)
+
+ val fileStatuses = baseTableMetada.getAllFilesInPartition(new
Path(s"$basePath/"))
+ val fileName = fileStatuses.apply(0).getPath.getName
+
+ val partitionFileNamePair :
java.util.List[org.apache.hudi.common.util.collection.Pair[String, String]] =
new util.ArrayList
+
partitionFileNamePair.add(org.apache.hudi.common.util.collection.Pair.of(partitionPathToTest,fileName))
+
+ val colStatsRecords =
baseTableMetada.getColumnStats(partitionFileNamePair, "begin_lat")
+ assertEquals(colStatsRecords.size(), 1)
+ val metadataColStats = colStatsRecords.get(partitionFileNamePair.get(0))
+
+ // read parquet file and verify stats
+ val colRangeMetadataList:
java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils()
+ .readRangeFromParquetMetadata(jsc().hadoopConfiguration(),
fileStatuses.apply(0).getPath, Collections.singletonList("begin_lat"))
+ val columnRangeMetadata = colRangeMetadataList.get(0)
+
+ assertEquals(metadataColStats.getValueCount,
columnRangeMetadata.getValueCount)
+ assertEquals(metadataColStats.getTotalSize,
columnRangeMetadata.getTotalSize)
+
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMaxValue),
columnRangeMetadata.getMaxValue)
+
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMinValue),
columnRangeMetadata.getMinValue)
+ assertEquals(metadataColStats.getFileName, fileName)
}
private def parseRecords(records: Seq[String]) = {