This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8f5f5470b61 [HUDI-7729] Move ParquetUtils to hudi-hadoop-common module
(#11186)
8f5f5470b61 is described below
commit 8f5f5470b61f6b2248e561c0ec7336d6f652c091
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu May 9 22:15:29 2024 -0700
[HUDI-7729] Move ParquetUtils to hudi-hadoop-common module (#11186)
---
.../org/apache/hudi/common/util/BaseFileUtils.java | 18 +++-
.../hudi/metadata/HoodieTableMetadataUtil.java | 7 +-
.../java/org/apache/hudi/common/util/OrcUtils.java | 7 ++
.../org/apache/hudi/common/util/ParquetUtils.java | 116 ++++++++++-----------
.../apache/hudi/common/util/TestParquetUtils.java | 114 ++++++++++++++++++++
.../org/apache/hudi/ColumnStatsIndexHelper.java | 2 +-
.../hudi/functional/TestColumnStatsIndex.scala | 2 +-
.../TestMetadataTableWithSparkDataSource.scala | 5 +-
.../utilities/HoodieMetadataTableValidator.java | 4 +-
9 files changed, 203 insertions(+), 72 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index c22b7f6a087..b36957609fb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -51,11 +51,12 @@ import java.util.stream.Collectors;
* Utils for Hudi base file.
*/
public abstract class BaseFileUtils {
+ public static final String PARQUET_UTILS =
"org.apache.hudi.common.util.ParquetUtils";
public static final String ORC_UTILS =
"org.apache.hudi.common.util.OrcUtils";
public static BaseFileUtils getInstance(String path) {
if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- return new ParquetUtils();
+ return ReflectionUtils.loadClass(PARQUET_UTILS);
} else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) {
return ReflectionUtils.loadClass(ORC_UTILS);
}
@@ -64,7 +65,7 @@ public abstract class BaseFileUtils {
public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) {
if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
- return new ParquetUtils();
+ return ReflectionUtils.loadClass(PARQUET_UTILS);
} else if (HoodieFileFormat.ORC.equals(fileFormat)) {
return ReflectionUtils.loadClass(ORC_UTILS);
}
@@ -285,6 +286,19 @@ public abstract class BaseFileUtils {
*/
public abstract Schema readAvroSchema(StorageConfiguration<?> configuration,
StoragePath filePath);
+ /**
+ * Reads column statistics stored in the metadata.
+ *
+ * @param storageConf storage configuration.
+ * @param filePath the data file path.
+ * @param columnList List of columns to get column statistics.
+ * @return {@link List} of {@link HoodieColumnRangeMetadata}.
+ */
+ @SuppressWarnings("rawtype")
+ public abstract List<HoodieColumnRangeMetadata<Comparable>>
readColumnStatsFromMetadata(StorageConfiguration<?> storageConf,
+
StoragePath filePath,
+
List<String> columnList);
+
/**
* @return The subclass's {@link HoodieFileFormat}.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 9e6e5b42975..3484fe8ae57 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -73,7 +73,6 @@ import org.apache.hudi.common.util.ExternalFilePathUtil;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
@@ -1214,8 +1213,8 @@ public class HoodieTableMetadataUtil {
try {
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePathV2(), filePath);
- return
- new
ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getStorageConf(),
fullFilePath, columnsToIndex);
+ return BaseFileUtils.getInstance(HoodieFileFormat.PARQUET)
+ .readColumnStatsFromMetadata(datasetMetaClient.getStorageConf(),
fullFilePath, columnsToIndex);
}
LOG.warn("Column range index not supported for: {}", filePath);
@@ -1280,7 +1279,7 @@ public class HoodieTableMetadataUtil {
* it could subsequently be used in column stats
*
* NOTE: This method has to stay compatible with the semantic of
- * {@link ParquetUtils#readRangeFromParquetMetadata} as they are used
in tandem
+ * {@link ParquetUtils#readColumnStatsFromMetadata} as they are used in
tandem
*/
private static Comparable<?> coerceToComparable(Schema schema, Object val) {
if (val == null) {
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index c37d3118c4d..e34f8c4f195 100644
--- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -273,6 +274,12 @@ public class OrcUtils extends BaseFileUtils {
}
}
+ @Override
+ public List<HoodieColumnRangeMetadata<Comparable>>
readColumnStatsFromMetadata(StorageConfiguration<?> storageConf, StoragePath
filePath, List<String> columnList) {
+ throw new UnsupportedOperationException(
+ "Reading column statistics from metadata is not supported for ORC
format yet");
+ }
+
@Override
public HoodieFileFormat getFormat() {
return HoodieFileFormat.ORC;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
similarity index 89%
rename from
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
rename to
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index 1c896654510..e31e610c7d7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.common.util;
@@ -250,6 +251,55 @@ public class ParquetUtils extends BaseFileUtils {
return new
AvroSchemaConverter(conf.unwrapAs(Configuration.class)).convert(parquetSchema);
}
+ @Override
+ public List<HoodieColumnRangeMetadata<Comparable>>
readColumnStatsFromMetadata(StorageConfiguration<?> storageConf,
+
StoragePath filePath,
+
List<String> columnList) {
+ ParquetMetadata metadata = readMetadata(storageConf, filePath);
+
+ // NOTE: This collector has to have fully specialized generic type params
since
+ // Java 1.8 struggles to infer them
+ Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String,
List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
+ Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);
+
+ // Collect stats from all individual Parquet blocks
+ Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap =
+ (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>)
metadata.getBlocks().stream().sequential()
+ .flatMap(blockMetaData ->
+ blockMetaData.getColumns().stream()
+ .filter(f ->
columnList.contains(f.getPath().toDotString()))
+ .map(columnChunkMetaData -> {
+ Statistics stats = columnChunkMetaData.getStatistics();
+ return HoodieColumnRangeMetadata.<Comparable>create(
+ filePath.getName(),
+ columnChunkMetaData.getPath().toDotString(),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+ stats.genericGetMin()),
+ convertToNativeJavaType(
+ columnChunkMetaData.getPrimitiveType(),
+ stats.genericGetMax()),
+ // NOTE: In case when column contains only nulls
Parquet won't be creating
+ // stats for it instead returning stubbed
(empty) object. In that case
+ // we have to equate number of nulls to the
value count ourselves
+ stats.isEmpty() ?
columnChunkMetaData.getValueCount() : stats.getNumNulls(),
+ columnChunkMetaData.getValueCount(),
+ columnChunkMetaData.getTotalSize(),
+ columnChunkMetaData.getTotalUncompressedSize());
+ })
+ )
+ .collect(groupingByCollector);
+
+ // Combine those into file-level statistics
+ // NOTE: Inlining this var makes javac (1.8) upset (due to its inability
to infer
+ // expression type correctly)
+ Stream<HoodieColumnRangeMetadata<Comparable>> stream =
columnToStatsListMap.values()
+ .stream()
+ .map(this::getColumnRangeInFile);
+
+ return stream.collect(Collectors.toList());
+ }
+
@Override
public HoodieFileFormat getFormat() {
return HoodieFileFormat.PARQUET;
@@ -330,60 +380,6 @@ public class ParquetUtils extends BaseFileUtils {
}
}
- /**
- * Parse min/max statistics stored in parquet footers for all columns.
- */
- @SuppressWarnings("rawtype")
- public List<HoodieColumnRangeMetadata<Comparable>>
readRangeFromParquetMetadata(
- @Nonnull StorageConfiguration<?> conf,
- @Nonnull StoragePath parquetFilePath,
- @Nonnull List<String> cols
- ) {
- ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
-
- // NOTE: This collector has to have fully specialized generic type params
since
- // Java 1.8 struggles to infer them
- Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String,
List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
- Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);
-
- // Collect stats from all individual Parquet blocks
- Map<String, List<HoodieColumnRangeMetadata<Comparable>>>
columnToStatsListMap =
- (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>)
metadata.getBlocks().stream().sequential()
- .flatMap(blockMetaData ->
- blockMetaData.getColumns().stream()
- .filter(f -> cols.contains(f.getPath().toDotString()))
- .map(columnChunkMetaData -> {
- Statistics stats = columnChunkMetaData.getStatistics();
- return HoodieColumnRangeMetadata.<Comparable>create(
- parquetFilePath.getName(),
- columnChunkMetaData.getPath().toDotString(),
- convertToNativeJavaType(
- columnChunkMetaData.getPrimitiveType(),
- stats.genericGetMin()),
- convertToNativeJavaType(
- columnChunkMetaData.getPrimitiveType(),
- stats.genericGetMax()),
- // NOTE: In case when column contains only nulls Parquet
won't be creating
- // stats for it instead returning stubbed (empty)
object. In that case
- // we have to equate number of nulls to the value
count ourselves
- stats.isEmpty() ? columnChunkMetaData.getValueCount() :
stats.getNumNulls(),
- columnChunkMetaData.getValueCount(),
- columnChunkMetaData.getTotalSize(),
- columnChunkMetaData.getTotalUncompressedSize());
- })
- )
- .collect(groupingByCollector);
-
- // Combine those into file-level statistics
- // NOTE: Inlining this var makes javac (1.8) upset (due to its inability
to infer
- // expression type correctly)
- Stream<HoodieColumnRangeMetadata<Comparable>> stream =
columnToStatsListMap.values()
- .stream()
- .map(this::getColumnRangeInFile);
-
- return stream.collect(Collectors.toList());
- }
-
private <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInFile(
@Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
) {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index 392dbd29d20..e39bd25f46a 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -46,16 +47,20 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.METADATA_FIELD_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -198,6 +203,115 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
HoodieTestUtils.getDefaultStorageConf(), new StoragePath(filePath)));
}
+ @Test
+ public void testReadColumnStatsFromMetadata() throws Exception {
+ List<Pair<Pair<String, String>, Boolean>> valueList = new ArrayList<>();
+ String minKey = "z";
+ String maxKey = "0";
+ String minValue = "z";
+ String maxValue = "0";
+ int nullValueCount = 0;
+ int totalCount = 1000;
+ String partitionPath = "path1";
+ for (int i = 0; i < totalCount; i++) {
+ boolean nullifyData = i % 3 == 0;
+ String rowKey = UUID.randomUUID().toString();
+ String value = String.valueOf(i);
+ valueList.add(Pair.of(Pair.of(rowKey, value), nullifyData));
+ minKey = (minKey.compareTo(rowKey) > 0) ? rowKey : minKey;
+ maxKey = (maxKey.compareTo(rowKey) < 0) ? rowKey : maxKey;
+
+ if (nullifyData) {
+ nullValueCount++;
+ } else {
+ minValue = (minValue.compareTo(value) > 0) ? value : minValue;
+ maxValue = (maxValue.compareTo(value) < 0) ? value : maxValue;
+ }
+ }
+
+ String fileName = "test.parquet";
+ String filePath = new StoragePath(basePath, fileName).toString();
+ String recordKeyField = "id";
+ String partitionPathField = "partition";
+ String dataField = "data";
+ Schema schema = getSchema(recordKeyField, partitionPathField, dataField);
+
+ BloomFilter filter = BloomFilterFactory
+ .createBloomFilter(1000, 0.0001, 10000,
BloomFilterTypeCode.SIMPLE.name());
+ HoodieAvroWriteSupport writeSupport =
+ new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema),
schema, Option.of(filter), new Properties());
+ try (ParquetWriter writer = new ParquetWriter(new Path(filePath),
writeSupport, CompressionCodecName.GZIP,
+ 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE)) {
+ valueList.forEach(entry -> {
+ GenericRecord rec = new GenericData.Record(schema);
+ rec.put(recordKeyField, entry.getLeft().getLeft());
+ rec.put(partitionPathField, partitionPath);
+ if (entry.getRight()) {
+ rec.put(dataField, null);
+ } else {
+ rec.put(dataField, entry.getLeft().getRight());
+ }
+ try {
+ writer.write(rec);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ writeSupport.add(entry.getLeft().getLeft());
+ });
+ }
+
+ List<String> columnList = new ArrayList<>();
+ columnList.add(recordKeyField);
+ columnList.add(partitionPathField);
+ columnList.add(dataField);
+
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
parquetUtils.readColumnStatsFromMetadata(
+ HoodieTestUtils.getDefaultStorageConf(), new
StoragePath(filePath), columnList)
+ .stream()
+ .sorted(Comparator.comparing(HoodieColumnRangeMetadata::getColumnName))
+ .collect(Collectors.toList());
+ assertEquals(3, columnRangeMetadataList.size(), "Should return column
stats of 3 columns");
+ validateColumnRangeMetadata(columnRangeMetadataList.get(0),
+ fileName, dataField, minValue, maxValue, nullValueCount, totalCount);
+ validateColumnRangeMetadata(columnRangeMetadataList.get(1),
+ fileName, recordKeyField, minKey, maxKey, 0, totalCount);
+ validateColumnRangeMetadata(columnRangeMetadataList.get(2),
+ fileName, partitionPathField, partitionPath, partitionPath, 0,
totalCount);
+ }
+
+ private Schema getSchema(String recordKeyField, String partitionPathField,
String dataField) {
+ List<Schema.Field> toBeAddedFields = new ArrayList<>();
+ Schema recordSchema = Schema.createRecord("HoodieRecord", "", "", false);
+
+ Schema.Field recordKeySchemaField =
+ new Schema.Field(recordKeyField,
createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
+ Schema.Field partitionPathSchemaField =
+ new Schema.Field(partitionPathField,
createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
+ Schema.Field dataSchemaField =
+ new Schema.Field(dataField, createNullableSchema(Schema.Type.STRING),
"", JsonProperties.NULL_VALUE);
+
+ toBeAddedFields.add(recordKeySchemaField);
+ toBeAddedFields.add(partitionPathSchemaField);
+ toBeAddedFields.add(dataSchemaField);
+ recordSchema.setFields(toBeAddedFields);
+ return recordSchema;
+ }
+
+ private void validateColumnRangeMetadata(HoodieColumnRangeMetadata metadata,
+ String filePath,
+ String columnName,
+ String minValue,
+ String maxValue,
+ long nullCount,
+ long valueCount) {
+ assertEquals(filePath, metadata.getFilePath(), "File path does not match");
+ assertEquals(columnName, metadata.getColumnName(), "Column name does not
match");
+ assertEquals(minValue, metadata.getMinValue(), "Min value does not match");
+ assertEquals(maxValue, metadata.getMaxValue(), "Max value does not match");
+ assertEquals(nullCount, metadata.getNullCount(), "Null count does not
match");
+ assertEquals(valueCount, metadata.getValueCount(), "Value count does not
match");
+ }
+
private void writeParquetFile(String typeCode, String filePath, List<String>
rowKeys) throws Exception {
writeParquetFile(typeCode, filePath, rowKeys,
HoodieAvroUtils.getRecordKeySchema(), false, "");
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 5a1877be101..11abebbb245 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -178,7 +178,7 @@ public class ColumnStatsIndexHelper {
Iterable<String> iterable = () -> paths;
return StreamSupport.stream(iterable.spliterator(), false)
.flatMap(path ->
- utils.readRangeFromParquetMetadata(
+ utils.readColumnStatsFromMetadata(
storageConf,
new StoragePath(path),
columnNames
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 20b8966bda3..953ab6ae1da 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -462,7 +462,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
val parquetFilePath = new StoragePath(
fs.listStatus(path).filter(fs =>
fs.getPath.getName.endsWith(".parquet")).toSeq.head.getPath.toUri)
- val ranges = utils.readRangeFromParquetMetadata(conf, parquetFilePath,
+ val ranges = utils.readColumnStatsFromMetadata(conf, parquetFilePath,
Seq("c1", "c2", "c3a", "c3b", "c3c", "c4", "c5", "c6", "c7",
"c8").asJava)
ranges.asScala.foreach(r => {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index c5d02267f2b..8c7e01488fc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.CsvSource
import java.util
import java.util.Collections
+
import scala.collection.JavaConverters._
@Tag("functional")
@@ -150,7 +151,7 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
// read parquet file and verify stats
val colRangeMetadataList:
java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils()
-
.readRangeFromParquetMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()),
+
.readColumnStatsFromMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()),
fileStatuses.get(0).getPath, Collections.singletonList("begin_lat"))
val columnRangeMetadata = colRangeMetadataList.get(0)
@@ -206,7 +207,7 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
// read parquet file and verify stats
val colRangeMetadataList:
java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils()
-
.readRangeFromParquetMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()),
+
.readColumnStatsFromMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()),
fileStatuses.get(0).getPath, Collections.singletonList("begin_lat"))
val columnRangeMetadata = colRangeMetadataList.get(0)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 655572447d5..a9e2d895c13 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -49,11 +49,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -1404,7 +1404,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
.collect(Collectors.toList());
} else {
return baseFileNameList.stream().flatMap(filename ->
- new ParquetUtils().readRangeFromParquetMetadata(
+
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata(
metaClient.getStorageConf(),
new
StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(),
partitionPath), filename),
allColumnNameList).stream())