This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 940ec142389 [HUDI-6954] Fixing unpartitioned datasets for col stats 
and bloom filter partition in MDT (#10251)
940ec142389 is described below

commit 940ec1423894985f247ea4a9a75a69871ad31264
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Dec 8 09:22:09 2023 -0800

    [HUDI-6954] Fixing unpartitioned datasets for col stats and bloom filter 
partition in MDT (#10251)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   9 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   2 +-
 .../index/bloom/SparkHoodieBloomIndexHelper.java   |   3 +-
 .../functional/TestHoodieBackedMetadata.java       |  23 +++-
 .../apache/hudi/metadata/BaseTableMetadata.java    |   4 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |  17 +--
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  38 ++++---
 .../TestMetadataTableWithSparkDataSource.scala     | 118 +++++++++++++++++++--
 8 files changed, 175 insertions(+), 39 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 781a9024117..6d849ea5dbe 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -407,7 +407,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
     Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
         .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
           return Pair.of(partitionName, p.getFileNameToSizeMap());
         })
         .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -587,7 +587,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     // FILES partition uses a single file group
     final int fileGroupCount = 1;
 
-    List<String> partitions = partitionInfoList.stream().map(p -> 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()))
+    List<String> partitions = partitionInfoList.stream().map(p -> 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()))
         .collect(Collectors.toList());
     final int totalDataFilesCount = 
partitionInfoList.stream().mapToInt(DirectoryInfo::getTotalFiles).sum();
     LOG.info("Committing total {} partitions and {} files to metadata", 
partitions.size(), totalDataFilesCount);
@@ -603,8 +603,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating 
records for metadata FILES partition");
     HoodieData<HoodieRecord> fileListRecords = 
engineContext.parallelize(partitionInfoList, 
partitionInfoList.size()).map(partitionInfo -> {
       Map<String, Long> fileNameToSizeMap = 
partitionInfo.getFileNameToSizeMap();
-      return HoodieMetadataPayload.createPartitionFilesRecord(
-          
HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()),
 fileNameToSizeMap, Collections.emptyList());
+      return 
HoodieMetadataPayload.createPartitionFilesRecord(partitionInfo.getRelativePath(),
 fileNameToSizeMap, Collections.emptyList());
     });
     ValidationUtils.checkState(fileListRecords.count() == partitions.size());
 
@@ -1446,7 +1445,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       } else {
         partitionPath = new Path(dataWriteConfig.getBasePath(), partition);
       }
-      final String partitionId = 
HoodieTableMetadataUtil.getPartitionIdentifier(partition);
+      final String partitionId = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
       FileStatus[] metadataFiles = 
metadata.getAllFilesInPartition(partitionPath);
       if (!dirInfoMap.containsKey(partition)) {
         // Entire partition has been deleted
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index f0e55c8dc0d..e7b51e50bbb 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -1417,7 +1417,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
               .forEach(partitionWriteStat -> {
                 String partitionStatName = partitionWriteStat.getKey();
                 List<HoodieWriteStat> writeStats = 
partitionWriteStat.getValue();
-                String partition = 
HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName);
+                String partition = 
HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionStatName);
                 if 
(!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
                   commitToPartitionsToFiles.get(commitTime).put(partition, new 
ArrayList<>());
                 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index cd5f14d8fdf..0b3414a8a2d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -38,6 +38,7 @@ import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -284,7 +285,7 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
       }
 
       String bloomIndexEncodedKey =
-          getBloomFilterIndexKey(new PartitionIndexID(partitionPath), new 
FileIndexID(baseFileName));
+          getBloomFilterIndexKey(new 
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionPath)),
 new FileIndexID(baseFileName));
 
       // NOTE: It's crucial that [[targetPartitions]] be congruent w/ the 
number of
       //       actual file-groups in the Bloom Index in MT
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index ef5e0714a2f..4c7c63ed81b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1840,7 +1840,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
               .forEach(partitionWriteStat -> {
                 String partitionStatName = partitionWriteStat.getKey();
                 List<HoodieWriteStat> writeStats = 
partitionWriteStat.getValue();
-                String partition = 
HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName);
+                String partition = 
HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionStatName);
                 if 
(!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
                   commitToPartitionsToFiles.get(commitTime).put(partition, new 
ArrayList<>());
                 }
@@ -2923,6 +2923,27 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testNonPartitionedColStats() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE, false);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    HoodieTestDataGenerator nonPartitionedGenerator = new 
HoodieTestDataGenerator(new String[] {""});
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build()).build();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // Write 1 (Bulk insert)
+      String newCommitTime = "0000001";
+      List<HoodieRecord> records = 
nonPartitionedGenerator.generateInserts(newCommitTime, 10);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      validateMetadata(client);
+
+      List<String> metadataPartitions = 
metadata(client).getAllPartitionPaths();
+      assertTrue(metadataPartitions.contains(""), "Must contain empty 
partition");
+    }
+  }
+
   /**
    * Test various metrics published by metadata table.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 7e1acf3a87c..1b7c2db2daa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -197,7 +197,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
     partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
       final String bloomFilterIndexKey = 
HoodieMetadataPayload.getBloomFilterIndexKey(
-          new PartitionIndexID(partitionNameFileNamePair.getLeft()), new 
FileIndexID(partitionNameFileNamePair.getRight()));
+          new 
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
 new FileIndexID(partitionNameFileNamePair.getRight()));
       partitionIDFileIDStrings.add(bloomFilterIndexKey);
       fileToKeyMap.put(bloomFilterIndexKey, partitionNameFileNamePair);
     });
@@ -245,7 +245,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
     for (Pair<String, String> partitionNameFileNamePair : 
partitionNameFileNameList) {
       final String columnStatsIndexKey = 
HoodieMetadataPayload.getColumnStatsIndexKey(
-          new PartitionIndexID(partitionNameFileNamePair.getLeft()),
+          new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
           new FileIndexID(partitionNameFileNamePair.getRight()),
           columnIndexID);
       columnStatKeyset.add(columnStatsIndexKey);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 3627161559a..080aeaec6a5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -72,7 +72,6 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier;
 
 /**
  * MetadataTable records are persisted with the schema defined in 
HoodieMetadata.avsc.
@@ -310,7 +309,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions, boolean isDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> 
fileInfo.put(getPartitionIdentifier(partition), new HoodieMetadataFileInfo(0L, 
isDeleted)));
+    partitions.forEach(partition -> 
fileInfo.put(HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition),
 new HoodieMetadataFileInfo(0L, isDeleted)));
 
     HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.getPartitionPath());
     HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
@@ -328,13 +327,14 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
                                                                                
Map<String, Long> filesAdded,
                                                                                
List<String> filesDeleted) {
+    String partitionIdentifier = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
     int size = filesAdded.size() + filesDeleted.size();
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(size, 1);
     filesAdded.forEach((fileName, fileSize) -> fileInfo.put(fileName, new 
HoodieMetadataFileInfo(fileSize, false)));
 
     filesDeleted.forEach(fileName -> fileInfo.put(fileName, 
DELETE_FILE_METADATA));
 
-    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieKey key = new HoodieKey(partitionIdentifier, 
MetadataPartitionType.FILES.getPartitionPath());
     HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieAvroRecord<>(key, payload);
   }
@@ -358,8 +358,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     checkArgument(!baseFileName.contains(Path.SEPARATOR)
             && FSUtils.isBaseFile(new Path(baseFileName)),
         "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
-    final String bloomFilterIndexKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()
-        .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+    final String bloomFilterIndexKey = getBloomFilterRecordKey(partitionName, 
baseFileName);
     HoodieKey key = new HoodieKey(bloomFilterIndexKey, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
 
     HoodieMetadataBloomFilter metadataBloomFilter =
@@ -408,6 +407,11 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     }
   }
 
+  private static String getBloomFilterRecordKey(String partitionName, String 
fileName) {
+    return new 
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionName)).asBase64EncodedString()
+        .concat(new FileIndexID(fileName).asBase64EncodedString());
+  }
+
   private HoodieMetadataBloomFilter 
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
     // Bloom filters are always additive. No need to merge with previous bloom 
filter
     return this.bloomFilterMetadata;
@@ -606,7 +610,8 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * @return Column stats index key
    */
   public static String getColumnStatsIndexKey(String partitionName, 
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata) {
-    final PartitionIndexID partitionIndexID = new 
PartitionIndexID(partitionName);
+
+    final PartitionIndexID partitionIndexID = new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName));
     final FileIndexID fileIndexID = new FileIndexID(new 
Path(columnRangeMetadata.getFilePath()).getName());
     final ColumnIndexID columnIndexID = new 
ColumnIndexID(columnRangeMetadata.getColumnName());
     return getColumnStatsIndexKey(partitionIndexID, fileIndexID, 
columnIndexID);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 2576f38c127..839a7ed41a3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -400,8 +400,6 @@ public class HoodieTableMetadataUtil {
               String partitionStatName = entry.getKey();
               List<HoodieWriteStat> writeStats = entry.getValue();
 
-              String partition = getPartitionIdentifier(partitionStatName);
-
               HashMap<String, Long> updatedFilesToSizesMapping =
                   writeStats.stream().reduce(new HashMap<>(writeStats.size()),
                       (map, stat) -> {
@@ -431,7 +429,7 @@ public class HoodieTableMetadataUtil {
                       CollectionUtils::combine);
 
               newFileCount.add(updatedFilesToSizesMapping.size());
-              return 
HoodieMetadataPayload.createPartitionFilesRecord(partition, 
updatedFilesToSizesMapping,
+              return 
HoodieMetadataPayload.createPartitionFilesRecord(partitionStatName, 
updatedFilesToSizesMapping,
                   Collections.emptyList());
             })
             .collect(Collectors.toList());
@@ -447,7 +445,7 @@ public class HoodieTableMetadataUtil {
   private static List<String> getPartitionsAdded(HoodieCommitMetadata 
commitMetadata) {
     return commitMetadata.getPartitionToWriteStats().keySet().stream()
         // We need to make sure we properly handle case of non-partitioned 
tables
-        .map(HoodieTableMetadataUtil::getPartitionIdentifier)
+        .map(HoodieTableMetadataUtil::getPartitionIdentifierForFilesPartition)
         .collect(Collectors.toList());
   }
 
@@ -557,10 +555,9 @@ public class HoodieTableMetadataUtil {
     int[] fileDeleteCount = {0};
     List<String> deletedPartitions = new ArrayList<>();
     cleanMetadata.getPartitionMetadata().forEach((partitionName, 
partitionMetadata) -> {
-      final String partition = getPartitionIdentifier(partitionName);
       // Files deleted from a partition
       List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
-      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, 
Collections.emptyMap(),
+      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partitionName, 
Collections.emptyMap(),
           deletedFiles);
       records.add(record);
       fileDeleteCount[0] += deletedFiles.size();
@@ -712,7 +709,7 @@ public class HoodieTableMetadataUtil {
           
dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(),
 HoodieRollbackPlan.class);
 
       rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> {
-        final String partitionId = 
getPartitionIdentifier(rollbackRequest.getPartitionPath());
+        final String partitionId = 
getPartitionIdentifierForFilesPartition(rollbackRequest.getPartitionPath());
         partitionToFilesMap.computeIfAbsent(partitionId, s -> new HashMap<>());
         // fetch only log files that are expected to be RB'd in DT as part of 
this rollback. these log files will not be deleted, but rendered
         // invalid once rollback is complete.
@@ -759,7 +756,7 @@ public class HoodieTableMetadataUtil {
       // Has this rollback produced new files?
       boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && 
!pm.getRollbackLogFiles().isEmpty();
       final String partition = pm.getPartitionPath();
-      final String partitionId = getPartitionIdentifier(partition);
+      final String partitionId = 
getPartitionIdentifierForFilesPartition(partition);
 
       BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
         // if a file exists in both written log files and rollback log files, 
we want to pick the one that is higher
@@ -792,20 +789,19 @@ public class HoodieTableMetadataUtil {
 
     partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
       fileChangeCount[0] += deletedFiles.size();
-      final String partition = getPartitionIdentifier(partitionName);
 
       Map<String, Long> filesAdded = Collections.emptyMap();
       if (partitionToAppendedFiles.containsKey(partitionName)) {
         filesAdded = partitionToAppendedFiles.remove(partitionName);
       }
 
-      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
+      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partitionName, filesAdded,
           deletedFiles);
       records.add(record);
     });
 
     partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
-      final String partition = getPartitionIdentifier(partitionName);
+      final String partition = 
getPartitionIdentifierForFilesPartition(partitionName);
       fileChangeCount[1] += appendedFileMap.size();
 
       // Validate that no appended file has been deleted
@@ -825,10 +821,22 @@ public class HoodieTableMetadataUtil {
     return records;
   }
 
+  public static String getColumnStatsIndexPartitionIdentifier(String 
partitionName) {
+    return getPartitionIdentifier(partitionName);
+  }
+
+  public static String getBloomFilterIndexPartitionIdentifier(String 
partitionName) {
+    return getPartitionIdentifier(partitionName);
+  }
+
+  public static String getPartitionIdentifierForFilesPartition(String 
relativePartitionPath) {
+    return getPartitionIdentifier(relativePartitionPath);
+  }
+
   /**
    * Returns partition name for the given path.
    */
-  public static String getPartitionIdentifier(@Nonnull String 
relativePartitionPath) {
+  private static String getPartitionIdentifier(@Nonnull String 
relativePartitionPath) {
     return EMPTY_PARTITION_NAME.equals(relativePartitionPath) ? 
NON_PARTITIONED_NAME : relativePartitionPath;
   }
 
@@ -868,9 +876,8 @@ public class HoodieTableMetadataUtil {
         }
       }
 
-      final String partition = getPartitionIdentifier(partitionName);
       return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-              partition, filename, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer, 
partitionFileFlagTuple.f2))
+              partitionName, filename, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer, 
partitionFileFlagTuple.f2))
           .iterator();
     });
   }
@@ -909,8 +916,7 @@ public class HoodieTableMetadataUtil {
       }
 
       final String filePathWithPartition = partitionName + "/" + filename;
-      final String partitionId = getPartitionIdentifier(partitionName);
-      return getColumnStatsRecords(partitionId, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
+      return getColumnStatsRecords(partitionName, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
     });
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index aa40e8c5156..168176b75c8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -18,11 +18,17 @@
 
 package org.apache.hudi.functional
 
+import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util.{ParquetUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.{BaseTableMetadata, HoodieBackedTableMetadata, 
HoodieTableMetadata, MetadataPartitionType}
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
 import org.apache.spark.SparkConf
@@ -30,30 +36,34 @@ import org.apache.spark.sql.SaveMode
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
+import java.util
+import java.util.Collections
 import scala.collection.JavaConverters._
 
 @Tag("functional")
 class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarness {
 
   val hudi = "org.apache.hudi"
-  var commonOpts = Map(
+  var nonPartitionedCommonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
     "hoodie.bulkinsert.shuffle.parallelism" -> "2",
     "hoodie.delete.shuffle.parallelism" -> "1",
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
-    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
   )
 
+  var partitionedCommonOpts =  nonPartitionedCommonOpts ++ Map(
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition")
+
   override def conf: SparkConf = conf(getSparkSqlConf)
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1/*, 5*/)) // TODO: fix for higher 
compactNumDeltaCommits - HUDI-6340
-  def testReadability(compactNumDeltaCommits: Int): Unit = {
+  @CsvSource(Array("1,true", "1,false")) // TODO: fix for higher 
compactNumDeltaCommits - HUDI-6340
+  def testReadability(compactNumDeltaCommits: Int, testPartitioned: Boolean): 
Unit = {
     val dataGen = new HoodieTestDataGenerator()
 
     val metadataOpts: Map[String, String] = Map(
@@ -61,6 +71,12 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
       HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
     )
 
+    val commonOpts = if (testPartitioned) {
+      partitionedCommonOpts
+    } else {
+      nonPartitionedCommonOpts
+    }
+
     val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++
       Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> 
compactNumDeltaCommits.toString)
 
@@ -84,16 +100,23 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
       .mode(SaveMode.Append)
       .save(basePath)
 
+    if (testPartitioned) {
+      validatePartitionedTable(basePath)
+    } else {
+      validateUnPartitionedTable(basePath)
+    }
+  }
+
+  private def validatePartitionedTable(basePath: String) : Unit = {
     // Files partition of MT
     val filesPartitionDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
-
     // Smoke test
     filesPartitionDF.show()
 
     // Query w/ 0 requested columns should be working fine
     assertEquals(4, filesPartitionDF.count())
 
-    val expectedKeys = Seq("2015/03/16", "2015/03/17", "2016/03/15", 
"__all_partitions__")
+    val expectedKeys = Seq("2015/03/16", "2015/03/17", "2016/03/15", 
HoodieTableMetadata.RECORDKEY_PARTITION_LIST)
     val keys = filesPartitionDF.select("key")
       .collect()
       .map(_.getString(0))
@@ -104,9 +127,90 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
 
     // Column Stats Index partition of MT
     val colStatsDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
+    // Smoke test
+    colStatsDF.show()
+
+    // lets pick one data file and validate col stats
+    val partitionPathToTest = "2015/03/16"
+    val engineContext = new HoodieSparkEngineContext(jsc())
+    val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build();
+    val baseTableMetada : HoodieTableMetadata = new 
HoodieBackedTableMetadata(engineContext, metadataConfig, s"$basePath", false)
+
+    val fileStatuses = baseTableMetada.getAllFilesInPartition(new 
Path(s"$basePath/" + partitionPathToTest))
+    val fileName = fileStatuses.apply(0).getPath.getName
+
+    val partitionFileNamePair : 
java.util.List[org.apache.hudi.common.util.collection.Pair[String, String]] = 
new util.ArrayList
+    
partitionFileNamePair.add(org.apache.hudi.common.util.collection.Pair.of(partitionPathToTest,fileName))
+
+    val colStatsRecords = 
baseTableMetada.getColumnStats(partitionFileNamePair, "begin_lat")
+    assertEquals(colStatsRecords.size(), 1)
+    val metadataColStats = colStatsRecords.get(partitionFileNamePair.get(0))
+
+    // read parquet file and verify stats
+    val colRangeMetadataList: 
java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils()
+      .readRangeFromParquetMetadata(jsc().hadoopConfiguration(), 
fileStatuses.apply(0).getPath, Collections.singletonList("begin_lat"))
+    val columnRangeMetadata = colRangeMetadataList.get(0)
+
+    assertEquals(metadataColStats.getValueCount, 
columnRangeMetadata.getValueCount)
+    assertEquals(metadataColStats.getTotalSize, 
columnRangeMetadata.getTotalSize)
+    
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMaxValue),
 columnRangeMetadata.getMaxValue)
+    
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMinValue),
 columnRangeMetadata.getMinValue)
+    assertEquals(metadataColStats.getFileName, fileName)
+  }
+
+  private def validateUnPartitionedTable(basePath: String) : Unit = {
+    // Files partition of MT
+    val filesPartitionDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
+    // Smoke test
+    filesPartitionDF.show()
 
+    // Query w/ 0 requested columns should be working fine
+    assertEquals(2, filesPartitionDF.count())
+
+    val expectedKeys = Seq(HoodieTableMetadata.NON_PARTITIONED_NAME, 
HoodieTableMetadata.RECORDKEY_PARTITION_LIST)
+    val keys = filesPartitionDF.select("key")
+      .collect()
+      .map(_.getString(0))
+      .toSeq
+      .sorted
+
+    assertEquals(expectedKeys, keys)
+
+    // Column Stats Index partition of MT
+    val colStatsDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
     // Smoke test
     colStatsDF.show()
+
+    // lets pick one data file and validate col stats
+    val partitionPathToTest = ""
+    val engineContext = new HoodieSparkEngineContext(jsc())
+    val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build();
+    val baseTableMetada : HoodieTableMetadata = new 
HoodieBackedTableMetadata(engineContext, metadataConfig, s"$basePath", false)
+
+    val allPartitionPaths = baseTableMetada.getAllPartitionPaths
+    assertEquals(allPartitionPaths.size(), 1)
+    assertEquals(allPartitionPaths.get(0), 
HoodieTableMetadata.EMPTY_PARTITION_NAME)
+
+    val fileStatuses = baseTableMetada.getAllFilesInPartition(new 
Path(s"$basePath/"))
+    val fileName = fileStatuses.apply(0).getPath.getName
+
+    val partitionFileNamePair : 
java.util.List[org.apache.hudi.common.util.collection.Pair[String, String]] = 
new util.ArrayList
+    
partitionFileNamePair.add(org.apache.hudi.common.util.collection.Pair.of(partitionPathToTest,fileName))
+
+    val colStatsRecords = 
baseTableMetada.getColumnStats(partitionFileNamePair, "begin_lat")
+    assertEquals(colStatsRecords.size(), 1)
+    val metadataColStats = colStatsRecords.get(partitionFileNamePair.get(0))
+
+    // read parquet file and verify stats
+    val colRangeMetadataList: 
java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils()
+      .readRangeFromParquetMetadata(jsc().hadoopConfiguration(), 
fileStatuses.apply(0).getPath, Collections.singletonList("begin_lat"))
+    val columnRangeMetadata = colRangeMetadataList.get(0)
+
+    assertEquals(metadataColStats.getValueCount, 
columnRangeMetadata.getValueCount)
+    assertEquals(metadataColStats.getTotalSize, 
columnRangeMetadata.getTotalSize)
+    
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMaxValue),
 columnRangeMetadata.getMaxValue)
+    
assertEquals(HoodieAvroUtils.unwrapAvroValueWrapper(metadataColStats.getMinValue),
 columnRangeMetadata.getMinValue)
+    assertEquals(metadataColStats.getFileName, fileName)
   }
 
   private def parseRecords(records: Seq[String]) = {

Reply via email to