This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 223588c7071c0a6df2d238e7e01f704e4d2a7b30 Author: Y Ethan Guo <[email protected]> AuthorDate: Thu May 9 22:15:29 2024 -0700 [HUDI-7729] Move ParquetUtils to hudi-hadoop-common module (#11186) --- .../org/apache/hudi/common/util/BaseFileUtils.java | 18 +++- .../hudi/metadata/HoodieTableMetadataUtil.java | 7 +- .../java/org/apache/hudi/common/util/OrcUtils.java | 7 ++ .../org/apache/hudi/common/util/ParquetUtils.java | 116 ++++++++++----------- .../apache/hudi/common/util/TestParquetUtils.java | 114 ++++++++++++++++++++ .../org/apache/hudi/ColumnStatsIndexHelper.java | 2 +- .../hudi/functional/TestColumnStatsIndex.scala | 2 +- .../TestMetadataTableWithSparkDataSource.scala | 5 +- .../utilities/HoodieMetadataTableValidator.java | 4 +- 9 files changed, 203 insertions(+), 72 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index 317a38bfc3e..95e117cee44 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -47,11 +47,12 @@ import java.util.Set; * Utils for Hudi base file. */ public abstract class BaseFileUtils { + public static final String PARQUET_UTILS = "org.apache.hudi.common.util.ParquetUtils"; public static final String ORC_UTILS = "org.apache.hudi.common.util.OrcUtils"; public static BaseFileUtils getInstance(String path) { if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return new ParquetUtils(); + return ReflectionUtils.loadClass(PARQUET_UTILS); } else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) { return ReflectionUtils.loadClass(ORC_UTILS); } @@ -60,7 +61,7 @@ public abstract class BaseFileUtils { public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { if (HoodieFileFormat.PARQUET.equals(fileFormat)) { - return new ParquetUtils(); + return ReflectionUtils.loadClass(PARQUET_UTILS); } else if (HoodieFileFormat.ORC.equals(fileFormat)) { return ReflectionUtils.loadClass(ORC_UTILS); } @@ -233,6 +234,19 @@ public abstract class BaseFileUtils { */ public abstract Schema readAvroSchema(StorageConfiguration<?> configuration, StoragePath filePath); + /** + * Reads column statistics stored in the metadata. + * + * @param storageConf storage configuration. + * @param filePath the data file path. + * @param columnList List of columns to get column statistics. + * @return {@link List} of {@link HoodieColumnRangeMetadata}. + */ + @SuppressWarnings("rawtype") + public abstract List<HoodieColumnRangeMetadata<Comparable>> readColumnStatsFromMetadata(StorageConfiguration<?> storageConf, + StoragePath filePath, + List<String> columnList); + /** * @return The subclass's {@link HoodieFileFormat}. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 41dfe940f6e..0198c402c75 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -61,7 +61,6 @@ import org.apache.hudi.common.util.ExternalFilePathUtil; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; @@ -1176,8 +1175,8 @@ public class HoodieTableMetadataUtil { try { if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { StoragePath fullFilePath = new StoragePath(datasetMetaClient.getBasePathV2(), filePath); - return - new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getStorageConf(), fullFilePath, columnsToIndex); + return BaseFileUtils.getInstance(HoodieFileFormat.PARQUET) + .readColumnStatsFromMetadata(datasetMetaClient.getStorageConf(), fullFilePath, columnsToIndex); } LOG.warn("Column range index not supported for: {}", filePath); @@ -1242,7 +1241,7 @@ public class HoodieTableMetadataUtil { * it could subsequently be used in column stats * * NOTE: This method has to stay compatible with the semantic of - * {@link ParquetUtils#readRangeFromParquetMetadata} as they are used in tandem + * {@link ParquetUtils#readColumnStatsFromMetadata} as they are used in tandem */ private static Comparable<?> coerceToComparable(Schema schema, Object val) { if (val == null) { diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index d0f51763e8d..185061bc464 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -266,6 +267,12 @@ public class OrcUtils extends BaseFileUtils { } } + @Override + public List<HoodieColumnRangeMetadata<Comparable>> readColumnStatsFromMetadata(StorageConfiguration<?> storageConf, StoragePath filePath, List<String> columnList) { + throw new UnsupportedOperationException( + "Reading column statistics from metadata is not supported for ORC format yet"); + } + @Override public HoodieFileFormat getFormat() { return HoodieFileFormat.ORC; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java similarity index 89% rename from hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 9298626262d..9d7ac5c6623 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.common.util; @@ -242,6 +243,55 @@ public class ParquetUtils extends BaseFileUtils { return new AvroSchemaConverter(conf.unwrapAs(Configuration.class)).convert(parquetSchema); } + @Override + public List<HoodieColumnRangeMetadata<Comparable>> readColumnStatsFromMetadata(StorageConfiguration<?> storageConf, + StoragePath filePath, + List<String> columnList) { + ParquetMetadata metadata = readMetadata(storageConf, filePath); + + // NOTE: This collector has to have fully specialized generic type params since + // Java 1.8 struggles to infer them + Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector = + Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName); + + // Collect stats from all individual Parquet blocks + Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = + (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential() + .flatMap(blockMetaData -> + blockMetaData.getColumns().stream() + .filter(f -> columnList.contains(f.getPath().toDotString())) + .map(columnChunkMetaData -> { + Statistics stats = columnChunkMetaData.getStatistics(); + return HoodieColumnRangeMetadata.<Comparable>create( + filePath.getName(), + columnChunkMetaData.getPath().toDotString(), + convertToNativeJavaType( + columnChunkMetaData.getPrimitiveType(), + stats.genericGetMin()), + convertToNativeJavaType( + columnChunkMetaData.getPrimitiveType(), + stats.genericGetMax()), + // NOTE: In case when column contains only nulls Parquet won't be creating + // stats for it instead returning stubbed (empty) object. In that case + // we have to equate number of nulls to the value count ourselves + stats.isEmpty() ? columnChunkMetaData.getValueCount() : stats.getNumNulls(), + columnChunkMetaData.getValueCount(), + columnChunkMetaData.getTotalSize(), + columnChunkMetaData.getTotalUncompressedSize()); + }) + ) + .collect(groupingByCollector); + + // Combine those into file-level statistics + // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer + // expression type correctly) + Stream<HoodieColumnRangeMetadata<Comparable>> stream = columnToStatsListMap.values() + .stream() + .map(this::getColumnRangeInFile); + + return stream.collect(Collectors.toList()); + } + @Override public HoodieFileFormat getFormat() { return HoodieFileFormat.PARQUET; @@ -322,60 +372,6 @@ public class ParquetUtils extends BaseFileUtils { } } - /** - * Parse min/max statistics stored in parquet footers for all columns. - */ - @SuppressWarnings("rawtype") - public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata( - @Nonnull StorageConfiguration<?> conf, - @Nonnull StoragePath parquetFilePath, - @Nonnull List<String> cols - ) { - ParquetMetadata metadata = readMetadata(conf, parquetFilePath); - - // NOTE: This collector has to have fully specialized generic type params since - // Java 1.8 struggles to infer them - Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector = - Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName); - - // Collect stats from all individual Parquet blocks - Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = - (Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential() - .flatMap(blockMetaData -> - blockMetaData.getColumns().stream() - .filter(f -> cols.contains(f.getPath().toDotString())) - .map(columnChunkMetaData -> { - Statistics stats = columnChunkMetaData.getStatistics(); - return HoodieColumnRangeMetadata.<Comparable>create( - parquetFilePath.getName(), - columnChunkMetaData.getPath().toDotString(), - convertToNativeJavaType( - columnChunkMetaData.getPrimitiveType(), - stats.genericGetMin()), - convertToNativeJavaType( - columnChunkMetaData.getPrimitiveType(), - stats.genericGetMax()), - // NOTE: In case when column contains only nulls Parquet won't be creating - // stats for it instead returning stubbed (empty) object. In that case - // we have to equate number of nulls to the value count ourselves - stats.isEmpty() ? columnChunkMetaData.getValueCount() : stats.getNumNulls(), - columnChunkMetaData.getValueCount(), - columnChunkMetaData.getTotalSize(), - columnChunkMetaData.getTotalUncompressedSize()); - }) - ) - .collect(groupingByCollector); - - // Combine those into file-level statistics - // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer - // expression type correctly) - Stream<HoodieColumnRangeMetadata<Comparable>> stream = columnToStatsListMap.values() - .stream() - .map(this::getColumnRangeInFile); - - return stream.collect(Collectors.toList()); - } - private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInFile( @Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges ) { diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index b4ed39316f5..2681e34425a 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; @@ -45,16 +46,20 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; +import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; import static org.apache.hudi.avro.HoodieAvroUtils.METADATA_FIELD_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -197,6 +202,115 @@ public class TestParquetUtils extends HoodieCommonTestHarness { HoodieTestUtils.getDefaultStorageConf(), new StoragePath(filePath))); } + @Test + public void testReadColumnStatsFromMetadata() throws Exception { + List<Pair<Pair<String, String>, Boolean>> valueList = new ArrayList<>(); + String minKey = "z"; + String maxKey = "0"; + String minValue = "z"; + String maxValue = "0"; + int nullValueCount = 0; + int totalCount = 1000; + String partitionPath = "path1"; + for (int i = 0; i < totalCount; i++) { + boolean nullifyData = i % 3 == 0; + String rowKey = UUID.randomUUID().toString(); + String value = String.valueOf(i); + valueList.add(Pair.of(Pair.of(rowKey, value), nullifyData)); + minKey = (minKey.compareTo(rowKey) > 0) ? rowKey : minKey; + maxKey = (maxKey.compareTo(rowKey) < 0) ? rowKey : maxKey; + + if (nullifyData) { + nullValueCount++; + } else { + minValue = (minValue.compareTo(value) > 0) ? value : minValue; + maxValue = (maxValue.compareTo(value) < 0) ? value : maxValue; + } + } + + String fileName = "test.parquet"; + String filePath = new StoragePath(basePath, fileName).toString(); + String recordKeyField = "id"; + String partitionPathField = "partition"; + String dataField = "data"; + Schema schema = getSchema(recordKeyField, partitionPathField, dataField); + + BloomFilter filter = BloomFilterFactory + .createBloomFilter(1000, 0.0001, 10000, BloomFilterTypeCode.SIMPLE.name()); + HoodieAvroWriteSupport writeSupport = + new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, Option.of(filter), new Properties()); + try (ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, + 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE)) { + valueList.forEach(entry -> { + GenericRecord rec = new GenericData.Record(schema); + rec.put(recordKeyField, entry.getLeft().getLeft()); + rec.put(partitionPathField, partitionPath); + if (entry.getRight()) { + rec.put(dataField, null); + } else { + rec.put(dataField, entry.getLeft().getRight()); + } + try { + writer.write(rec); + } catch (IOException e) { + throw new RuntimeException(e); + } + writeSupport.add(entry.getLeft().getLeft()); + }); + } + + List<String> columnList = new ArrayList<>(); + columnList.add(recordKeyField); + columnList.add(partitionPathField); + columnList.add(dataField); + + List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = parquetUtils.readColumnStatsFromMetadata( + HoodieTestUtils.getDefaultStorageConf(), new StoragePath(filePath), columnList) + .stream() + .sorted(Comparator.comparing(HoodieColumnRangeMetadata::getColumnName)) + .collect(Collectors.toList()); + assertEquals(3, columnRangeMetadataList.size(), "Should return column stats of 3 columns"); + validateColumnRangeMetadata(columnRangeMetadataList.get(0), + fileName, dataField, minValue, maxValue, nullValueCount, totalCount); + validateColumnRangeMetadata(columnRangeMetadataList.get(1), + fileName, recordKeyField, minKey, maxKey, 0, totalCount); + validateColumnRangeMetadata(columnRangeMetadataList.get(2), + fileName, partitionPathField, partitionPath, partitionPath, 0, totalCount); + } + + private Schema getSchema(String recordKeyField, String partitionPathField, String dataField) { + List<Schema.Field> toBeAddedFields = new ArrayList<>(); + Schema recordSchema = Schema.createRecord("HoodieRecord", "", "", false); + + Schema.Field recordKeySchemaField = + new Schema.Field(recordKeyField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE); + Schema.Field partitionPathSchemaField = + new Schema.Field(partitionPathField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE); + Schema.Field dataSchemaField = + new Schema.Field(dataField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE); + + toBeAddedFields.add(recordKeySchemaField); + toBeAddedFields.add(partitionPathSchemaField); + toBeAddedFields.add(dataSchemaField); + recordSchema.setFields(toBeAddedFields); + return recordSchema; + } + + private void validateColumnRangeMetadata(HoodieColumnRangeMetadata metadata, + String filePath, + String columnName, + String minValue, + String maxValue, + long nullCount, + long valueCount) { + assertEquals(filePath, metadata.getFilePath(), "File path does not match"); + assertEquals(columnName, metadata.getColumnName(), "Column name does not match"); + assertEquals(minValue, metadata.getMinValue(), "Min value does not match"); + assertEquals(maxValue, metadata.getMaxValue(), "Max value does not match"); + assertEquals(nullCount, metadata.getNullCount(), "Null count does not match"); + assertEquals(valueCount, metadata.getValueCount(), "Value count does not match"); + } + private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys) throws Exception { writeParquetFile(typeCode, filePath, rowKeys, HoodieAvroUtils.getRecordKeySchema(), false, ""); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java index 5a1877be101..11abebbb245 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java @@ -178,7 +178,7 @@ public class ColumnStatsIndexHelper { Iterable<String> iterable = () -> paths; return StreamSupport.stream(iterable.spliterator(), false) .flatMap(path -> - utils.readRangeFromParquetMetadata( + utils.readColumnStatsFromMetadata( storageConf, new StoragePath(path), columnNames diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 4b7f9855d27..32a91279e97 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -403,7 +403,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { val parquetFilePath = new StoragePath( fs.listStatus(path).filter(fs => fs.getPath.getName.endsWith(".parquet")).toSeq.head.getPath.toUri) - val ranges = utils.readRangeFromParquetMetadata(conf, parquetFilePath, + val ranges = utils.readColumnStatsFromMetadata(conf, parquetFilePath, Seq("c1", "c2", "c3a", "c3b", "c3c", "c4", "c5", "c6", "c7", "c8").asJava) ranges.asScala.foreach(r => { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala index c5d02267f2b..8c7e01488fc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala @@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.CsvSource import java.util import java.util.Collections + import scala.collection.JavaConverters._ @Tag("functional") @@ -150,7 +151,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn // read parquet file and verify stats val colRangeMetadataList: java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils() - .readRangeFromParquetMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()), + .readColumnStatsFromMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()), fileStatuses.get(0).getPath, Collections.singletonList("begin_lat")) val columnRangeMetadata = colRangeMetadataList.get(0) @@ -206,7 +207,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn // read parquet file and verify stats val colRangeMetadataList: java.util.List[HoodieColumnRangeMetadata[Comparable[_]]] = new ParquetUtils() - .readRangeFromParquetMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()), + .readColumnStatsFromMetadata(HadoopFSUtils.getStorageConf(jsc().hadoopConfiguration()), fileStatuses.get(0).getPath, Collections.singletonList("begin_lat")) val columnRangeMetadata = colRangeMetadataList.get(0) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 7554b31272f..b0fe09b4c76 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -49,11 +49,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -1405,7 +1405,7 @@ public class HoodieMetadataTableValidator implements Serializable { .collect(Collectors.toList()); } else { return baseFileNameList.stream().flatMap(filename -> - new ParquetUtils().readRangeFromParquetMetadata( + BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readColumnStatsFromMetadata( metaClient.getStorageConf(), new StoragePath(FSUtils.constructAbsolutePath(metaClient.getBasePathV2(), partitionPath), filename), allColumnNameList).stream())
