This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 43075655 Parquet source: Expand column stats support, fix bugs in
schema conversion (#805)
43075655 is described below
commit 430756556a24e3b3714477d33a74d1cc21b80f45
Author: Tim Brown <[email protected]>
AuthorDate: Mon Feb 23 10:55:29 2026 -0800
Parquet source: Expand column stats support, fix bugs in schema conversion
(#805)
* fix stats extraction to be per file and handle more types
* fix list and map handling
* cleanup
* spotless
* address comments, set null count, handle null stats
* remove more unused variables
---
.../xtable/parquet/ParquetConversionSource.java | 36 +-
.../xtable/parquet/ParquetMetadataExtractor.java | 24 +-
.../parquet/ParquetPartitionValueExtractor.java | 9 -
.../xtable/parquet/ParquetSchemaExtractor.java | 65 +-
.../xtable/parquet/ParquetStatsConverterUtil.java | 69 --
.../xtable/parquet/ParquetStatsExtractor.java | 182 ++---
.../xtable/parquet/TestParquetSchemaExtractor.java | 134 +++-
.../xtable/parquet/TestParquetStatsExtractor.java | 859 ++++++++++++---------
8 files changed, 755 insertions(+), 623 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
index e5625be7..61a2ba8f 100644
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
@@ -110,7 +110,8 @@ public class ParquetConversionSource implements
ConversionSource<Long> {
return createInternalTableFromFile(file);
}
- private Stream<InternalDataFile>
getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) {
+ private Stream<InternalDataFile> getInternalDataFiles(
+ Stream<LocatedFileStatus> parquetFiles, InternalSchema schema) {
return parquetFiles.map(
file ->
InternalDataFile.builder()
@@ -119,35 +120,29 @@ public class ParquetConversionSource implements
ConversionSource<Long> {
.fileSizeBytes(file.getLen())
.partitionValues(
partitionValueExtractor.extractPartitionValues(
- partitionSpecExtractor.spec(
-
partitionValueExtractor.extractSchemaForParquetPartitions(
- parquetMetadataExtractor.readParquetMetadata(
- hadoopConf, file.getPath()),
- file.getPath().toString())),
+ partitionSpecExtractor.spec(schema),
HudiPathUtils.getPartitionPath(new Path(basePath),
file.getPath())))
.lastModified(file.getModificationTime())
.columnStats(
- parquetStatsExtractor.getColumnStatsForaFile(
-
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath())))
+ parquetStatsExtractor.getStatsForFile(
+
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()),
+ schema))
.build());
}
- private InternalDataFile createInternalDataFileFromParquetFile(FileStatus
parquetFile) {
+ private InternalDataFile createInternalDataFileFromParquetFile(
+ FileStatus parquetFile, InternalSchema schema) {
return InternalDataFile.builder()
.physicalPath(parquetFile.getPath().toString())
.partitionValues(
partitionValueExtractor.extractPartitionValues(
- partitionSpecExtractor.spec(
- partitionValueExtractor.extractSchemaForParquetPartitions(
- parquetMetadataExtractor.readParquetMetadata(
- hadoopConf, parquetFile.getPath()),
- parquetFile.getPath().toString())),
- basePath))
+ partitionSpecExtractor.spec(schema), basePath))
.lastModified(parquetFile.getModificationTime())
.fileSizeBytes(parquetFile.getLen())
.columnStats(
- parquetStatsExtractor.getColumnStatsForaFile(
- parquetMetadataExtractor.readParquetMetadata(hadoopConf,
parquetFile.getPath())))
+ parquetStatsExtractor.getStatsForFile(
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
parquetFile.getPath()),
+ schema))
.build();
}
@@ -169,7 +164,8 @@ public class ParquetConversionSource implements
ConversionSource<Long> {
.collect(Collectors.toList());
InternalTable internalTable = getMostRecentTable(parquetFiles);
for (FileStatus tableStatus : tableChangesAfter) {
- InternalDataFile currentDataFile =
createInternalDataFileFromParquetFile(tableStatus);
+ InternalDataFile currentDataFile =
+ createInternalDataFileFromParquetFile(tableStatus,
internalTable.getReadSchema());
addedInternalDataFiles.add(currentDataFile);
}
@@ -198,9 +194,9 @@ public class ParquetConversionSource implements
ConversionSource<Long> {
@Override
public InternalSnapshot getCurrentSnapshot() {
// to avoid consume the stream call the method twice to return the same
stream of parquet files
- Stream<InternalDataFile> internalDataFiles =
- getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf,
basePath));
+ Stream<InternalDataFile> internalDataFiles =
+ getInternalDataFiles(getParquetFiles(hadoopConf, basePath),
table.getReadSchema());
return InternalSnapshot.builder()
.table(table)
.sourceIdentifier(
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
index f29a186d..f022b753 100644
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
@@ -20,6 +20,9 @@ package org.apache.xtable.parquet;
import java.io.IOException;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
@@ -27,11 +30,11 @@ import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.InputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.xtable.exception.ReadException;
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ParquetMetadataExtractor {
private static final ParquetMetadataExtractor INSTANCE = new
ParquetMetadataExtractor();
@@ -40,23 +43,16 @@ public class ParquetMetadataExtractor {
return INSTANCE;
}
- public static MessageType getSchema(ParquetMetadata footer) {
- MessageType schema = footer.getFileMetaData().getSchema();
- return schema;
+ public MessageType getSchema(ParquetMetadata footer) {
+ return footer.getFileMetaData().getSchema();
}
- public static ParquetMetadata readParquetMetadata(Configuration conf, Path
filePath) {
- InputFile file = null;
- try {
- file = HadoopInputFile.fromPath(filePath, conf);
- } catch (IOException e) {
- throw new ReadException("Failed to read the parquet file", e);
- }
-
+ public ParquetMetadata readParquetMetadata(Configuration conf, Path
filePath) {
ParquetReadOptions options = HadoopReadOptions.builder(conf,
filePath).build();
- try (ParquetFileReader fileReader = ParquetFileReader.open(file, options))
{
+ try (ParquetFileReader fileReader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(filePath, conf),
options)) {
return fileReader.getFooter();
- } catch (Exception e) {
+ } catch (IOException e) {
throw new ReadException("Failed to read the parquet file", e);
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
index 63b35846..19c2f705 100644
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
@@ -28,13 +28,9 @@ import java.util.TimeZone;
import lombok.NonNull;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.MessageType;
-
import org.apache.xtable.exception.PartitionValuesExtractorException;
import org.apache.xtable.hudi.PathBasedPartitionValuesExtractor;
import org.apache.xtable.model.schema.InternalPartitionField;
-import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
@@ -134,9 +130,4 @@ public class ParquetPartitionValueExtractor extends
PathBasedPartitionValuesExtr
public static ParquetPartitionValueExtractor getInstance() {
return INSTANCE;
}
-
- public InternalSchema extractSchemaForParquetPartitions(ParquetMetadata
footer, String path) {
- MessageType parquetSchema = parquetMetadataExtractor.getSchema(footer);
- return schemaExtractor.toInternalSchema(parquetSchema, path);
- }
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
index ed6cf284..48144287 100644
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
@@ -61,12 +61,9 @@ public class ParquetSchemaExtractor {
.dataType(InternalType.STRING)
.isNullable(false)
.build())
- .defaultValue("")
.build();
private static final ParquetSchemaExtractor INSTANCE = new
ParquetSchemaExtractor();
- private static final String ELEMENT = "element";
- private static final String KEY = "key";
- private static final String VALUE = "value";
+ private static final String LIST = "list";
public static ParquetSchemaExtractor getInstance() {
return INSTANCE;
@@ -86,7 +83,6 @@ public class ParquetSchemaExtractor {
*/
public InternalSchema toInternalSchema(Type schema, String parentPath) {
InternalType newDataType = null;
- Type.Repetition currentRepetition = null;
List<InternalField> subFields = null;
PrimitiveType primitiveType;
LogicalTypeAnnotation logicalType;
@@ -195,6 +191,10 @@ public class ParquetSchemaExtractor {
InternalSchema.MetadataKey.DECIMAL_SCALE,
((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getScale());
newDataType = InternalType.DECIMAL;
+ } else {
+ newDataType = InternalType.FIXED;
+ metadata.put(
+ InternalSchema.MetadataKey.FIXED_BYTES_SIZE,
primitiveType.getTypeLength());
}
break;
case BINARY:
@@ -233,11 +233,18 @@ public class ParquetSchemaExtractor {
// GroupTypes
logicalType = schema.getLogicalTypeAnnotation();
if (logicalType instanceof
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
- String schemaName = schema.asGroupType().getName();
- Type.ID schemaId = schema.getId();
+ Type elementType;
+ // check if 3-level or 2-level list encoding
+ if (schema.asGroupType().getFieldCount() == 1
+ && !schema.asGroupType().getType(0).isPrimitive()
+ &&
schema.asGroupType().getType(0).asGroupType().getName().equals(LIST)) {
+ elementType =
schema.asGroupType().getType(0).asGroupType().getType(0);
+ } else {
+ elementType = schema.asGroupType().getType(0);
+ }
InternalSchema elementSchema =
toInternalSchema(
- schema.asGroupType().getType(0),
+ elementType,
SchemaUtils.getFullyQualifiedPath(
parentPath,
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME));
InternalField elementField =
@@ -245,7 +252,7 @@ public class ParquetSchemaExtractor {
.name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
.parentPath(parentPath)
.schema(elementSchema)
- .fieldId(schemaId == null ? null : schemaId.intValue())
+ .fieldId(elementType.getId() == null ? null :
elementType.getId().intValue())
.build();
return InternalSchema.builder()
.name(schema.getName())
@@ -256,10 +263,25 @@ public class ParquetSchemaExtractor {
.build();
} else if (logicalType instanceof
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
String schemaName = schema.asGroupType().getName();
- Type.ID schemaId = schema.getId();
+ List<Type> keyAndValueTypes =
+ schema.asGroupType().getFields().get(0).asGroupType().getFields();
+ Type keyType = keyAndValueTypes.get(0);
+ InternalSchema keySchema =
+ toInternalSchema(
+ keyType,
+ SchemaUtils.getFullyQualifiedPath(
+ parentPath, InternalField.Constants.MAP_KEY_FIELD_NAME));
+ InternalField keyField =
+ MAP_KEY_FIELD.toBuilder()
+ .parentPath(parentPath)
+ .schema(keySchema)
+ .fieldId(keyType.getId() == null ? null :
keyType.getId().intValue())
+ .build();
+
+ Type valueType = keyAndValueTypes.get(1);
InternalSchema valueSchema =
toInternalSchema(
- schema.asGroupType().getType(0),
+ valueType,
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME));
InternalField valueField =
@@ -267,14 +289,14 @@ public class ParquetSchemaExtractor {
.name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
.parentPath(parentPath)
.schema(valueSchema)
- .fieldId(schemaId == null ? null : schemaId.intValue())
+ .fieldId(valueType.getId() == null ? null :
valueType.getId().intValue())
.build();
return InternalSchema.builder()
.name(schemaName)
.dataType(InternalType.MAP)
.comment(null)
.isNullable(isNullable(schema.asGroupType()))
- .fields(valueSchema.getFields())
+ .fields(Arrays.asList(keyField, valueField))
.build();
} else {
subFields = new ArrayList<>(schema.asGroupType().getFields().size());
@@ -285,13 +307,6 @@ public class ParquetSchemaExtractor {
toInternalSchema(
parquetField, SchemaUtils.getFullyQualifiedPath(parentPath,
fieldName));
- if (schema.asGroupType().getFields().size()
- == 1) { // TODO Tuple (many subelements in a list)
- newDataType = subFieldSchema.getDataType();
- elementName = subFieldSchema.getName();
- // subFields = subFieldSchema.getFields();
- break;
- }
subFields.add(
InternalField.builder()
.parentPath(parentPath)
@@ -338,10 +353,6 @@ public class ParquetSchemaExtractor {
*/
public Type fromInternalSchema(InternalSchema internalSchema, String
currentPath) {
Type type = null;
- Type listType = null;
- Type mapType = null;
- Type mapKeyType = null;
- Type mapValueType = null;
String fieldName = internalSchema.getName();
InternalType internalType = internalSchema.getDataType();
switch (internalType) {
@@ -459,7 +470,7 @@ public class ParquetSchemaExtractor {
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(field.getName()))
.findFirst()
.orElseThrow(() -> new SchemaExtractorException("Invalid array
schema"));
- listType = fromInternalSchema(elementField.getSchema(),
elementField.getPath());
+ Type listType = fromInternalSchema(elementField.getSchema(),
elementField.getPath());
type =
Types.requiredList().setElementType(listType).named(internalSchema.getName());
// TODO nullable lists
break;
@@ -475,8 +486,8 @@ public class ParquetSchemaExtractor {
field ->
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(field.getName()))
.findFirst()
.orElseThrow(() -> new SchemaExtractorException("Invalid map
schema"));
- mapKeyType = fromInternalSchema(keyField.getSchema(),
valueField.getPath());
- mapValueType = fromInternalSchema(valueField.getSchema(),
valueField.getPath());
+ Type mapKeyType = fromInternalSchema(keyField.getSchema(),
valueField.getPath());
+ Type mapValueType = fromInternalSchema(valueField.getSchema(),
valueField.getPath());
type =
Types.requiredMap().key(mapKeyType).value(mapValueType).named(internalSchema.getName());
// TODO nullable lists
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
deleted file mode 100644
index e5fd2d07..00000000
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.parquet;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.PrimitiveType;
-
-public class ParquetStatsConverterUtil {
- public static Object convertStatBinaryTypeToLogicalType(
- ColumnChunkMetaData columnMetaData, boolean isMin) {
- Object returnedObj = null;
- PrimitiveType primitiveType = columnMetaData.getPrimitiveType();
- switch (primitiveType.getPrimitiveTypeName()) {
- case BINARY: // TODO check if other primitiveType' needs to be handled
as well
- if (primitiveType.getLogicalTypeAnnotation() != null) {
- if (columnMetaData
- .getPrimitiveType()
- .getLogicalTypeAnnotation()
- .toString()
- .equals("STRING")) {
- returnedObj =
- new String(
- (isMin
- ? (Binary)
columnMetaData.getStatistics().genericGetMin()
- : (Binary)
columnMetaData.getStatistics().genericGetMax())
- .getBytes(),
- StandardCharsets.UTF_8);
- } else {
- returnedObj =
- isMin
- ? columnMetaData.getStatistics().genericGetMin()
- : columnMetaData.getStatistics().genericGetMax();
- }
- } else {
- returnedObj =
- isMin
- ? columnMetaData.getStatistics().genericGetMin()
- : columnMetaData.getStatistics().genericGetMax();
- }
- break;
- default:
- returnedObj =
- isMin
- ? columnMetaData.getStatistics().genericGetMin()
- : columnMetaData.getStatistics().genericGetMax();
- // TODO JSON and DECIMAL... of BINARY primitiveType
- }
- return returnedObj;
- }
-}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
index 46082353..b6f0f330 100644
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
@@ -18,33 +18,30 @@
package org.apache.xtable.parquet;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Value;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
-import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.schema.SchemaFieldFinder;
@Value
@Builder
@@ -52,9 +49,6 @@ public class ParquetStatsExtractor {
private static final ParquetStatsExtractor INSTANCE = new
ParquetStatsExtractor();
- private static final ParquetSchemaExtractor schemaExtractor =
- ParquetSchemaExtractor.getInstance();
-
private static final ParquetMetadataExtractor parquetMetadataExtractor =
ParquetMetadataExtractor.getInstance();
@@ -62,97 +56,79 @@ public class ParquetStatsExtractor {
return INSTANCE;
}
- private static final ParquetPartitionValueExtractor partitionValueExtractor =
- ParquetPartitionValueExtractor.getInstance();
- private static PathBasedPartitionSpecExtractor partitionSpecExtractor =
- ParquetPartitionSpecExtractor.getInstance();
-
- public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata
footer) {
- return getStatsForFile(footer).values().stream()
- .flatMap(List::stream)
- .collect(Collectors.toList());
- }
+ @SuppressWarnings("unchecked")
+ private static final Comparator<Object> COMPARABLE_COMPARATOR =
+ (a, b) -> ((Comparable<Object>) a).compareTo(b);
- private static Optional<Long> getMaxFromColumnStats(List<ColumnStat>
columnStats) {
- return columnStats.stream()
- .filter(entry -> entry.getField().getParentPath() == null)
- .map(ColumnStat::getNumValues)
- .filter(numValues -> numValues > 0)
- .max(Long::compareTo);
+ private static ColumnStat mergeColumnChunks(
+ List<ColumnChunkMetaData> chunks, InternalSchema internalSchema) {
+ ColumnChunkMetaData first = chunks.get(0);
+ String dotStringPath = first.getPath().toDotString();
+ InternalField internalField =
+ SchemaFieldFinder.getInstance().findFieldByPath(internalSchema,
dotStringPath);
+ Objects.requireNonNull(internalField, "No field found for path: " +
dotStringPath);
+ PrimitiveType primitiveType = first.getPrimitiveType();
+ long totalNumValues =
chunks.stream().mapToLong(ColumnChunkMetaData::getValueCount).sum();
+ long totalSize =
chunks.stream().mapToLong(ColumnChunkMetaData::getTotalSize).sum();
+ long totalNullValues =
+ chunks.stream()
+ .map(ColumnChunkMetaData::getStatistics)
+ .mapToLong(Statistics::getNumNulls)
+ .sum();
+ Object globalMin =
+ chunks.stream()
+ .filter(c -> c.getStatistics().hasNonNullValue())
+ .map(c -> convertStatsToInternalType(primitiveType,
c.getStatistics().genericGetMin()))
+ .min(COMPARABLE_COMPARATOR)
+ .orElse(null);
+ Object globalMax =
+ chunks.stream()
+ .filter(c -> c.getStatistics().hasNonNullValue())
+ .map(c -> convertStatsToInternalType(primitiveType,
c.getStatistics().genericGetMax()))
+ .max(COMPARABLE_COMPARATOR)
+ .orElse(null);
+ return ColumnStat.builder()
+ .field(internalField)
+ .numValues(totalNumValues)
+ .numNulls(totalNullValues)
+ .totalSize(totalSize)
+ .range(Range.vector(globalMin, globalMax))
+ .build();
}
- public static Map<ColumnDescriptor, List<ColumnStat>>
getStatsForFile(ParquetMetadata footer) {
- Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>();
+ public static List<ColumnStat> getStatsForFile(
+ ParquetMetadata footer, InternalSchema internalSchema) {
MessageType schema = parquetMetadataExtractor.getSchema(footer);
- List<ColumnChunkMetaData> columns = new ArrayList<>();
- columns =
- footer.getBlocks().stream()
- .flatMap(blockMetaData -> blockMetaData.getColumns().stream())
- .collect(Collectors.toList());
- columnDescStats =
- columns.stream()
- .collect(
- Collectors.groupingBy(
- columnMetaData ->
-
schema.getColumnDescription(columnMetaData.getPath().toArray()),
- Collectors.mapping(
- columnMetaData ->
- ColumnStat.builder()
- .field(
- InternalField.builder()
-
.name(columnMetaData.getPrimitiveType().getName())
- .fieldId(
-
columnMetaData.getPrimitiveType().getId() == null
- ? null
- : columnMetaData
- .getPrimitiveType()
- .getId()
- .intValue())
- .parentPath(null)
- .schema(
- schemaExtractor.toInternalSchema(
-
columnMetaData.getPrimitiveType(),
-
columnMetaData.getPath().toDotString()))
- .build())
- .numValues(columnMetaData.getValueCount())
- .totalSize(columnMetaData.getTotalSize())
- .range(
- Range.vector(
- ParquetStatsConverterUtil
-
.convertStatBinaryTypeToLogicalType(
- columnMetaData,
- true), // if stats are string
convert to
- // litteraly a string stat and
- // store to range
- ParquetStatsConverterUtil
-
.convertStatBinaryTypeToLogicalType(
- columnMetaData, false)))
- .build(),
- Collectors.toList())));
- return columnDescStats;
+ return footer.getBlocks().stream()
+ .flatMap(block -> block.getColumns().stream())
+ .collect(
+ Collectors.groupingBy(chunk ->
schema.getColumnDescription(chunk.getPath().toArray())))
+ .values()
+ .stream()
+ .map(columnChunks -> mergeColumnChunks(columnChunks, internalSchema))
+ .collect(Collectors.toList());
}
- public static InternalDataFile toInternalDataFile(Configuration hadoopConf,
Path parentPath)
- throws IOException {
- FileSystem fs = FileSystem.get(hadoopConf);
- FileStatus file = fs.getFileStatus(parentPath);
- ParquetMetadata footer =
parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath);
- List<ColumnStat> columnStatsForAFile = getColumnStatsForaFile(footer);
- List<PartitionValue> partitionValues =
- partitionValueExtractor.extractPartitionValues(
- partitionSpecExtractor.spec(
- partitionValueExtractor.extractSchemaForParquetPartitions(
- parquetMetadataExtractor.readParquetMetadata(hadoopConf,
file.getPath()),
- file.getPath().toString())),
- parentPath.toString());
- return InternalDataFile.builder()
- .physicalPath(parentPath.toString())
- .fileFormat(FileFormat.APACHE_PARQUET)
- .partitionValues(partitionValues)
- .fileSizeBytes(file.getLen())
- .recordCount(getMaxFromColumnStats(columnStatsForAFile).orElse(0L))
- .columnStats(columnStatsForAFile)
- .lastModified(file.getModificationTime())
- .build();
+ private static Object convertStatsToInternalType(PrimitiveType
primitiveType, Object value) {
+ LogicalTypeAnnotation annotation =
primitiveType.getLogicalTypeAnnotation();
+
+ // DECIMAL: convert unscaled backing value → BigDecimal regardless of
primitive type
+ if (annotation instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ int scale = ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
annotation).getScale();
+ switch (primitiveType.getPrimitiveTypeName()) {
+ case INT32:
+ case INT64:
+ return new BigDecimal(BigInteger.valueOf(((Number)
value).longValue()), scale);
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return new BigDecimal(new BigInteger(((Binary) value).getBytes()),
scale);
+ default:
+ return value;
+ }
+ } else if (annotation instanceof
LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+ // STRING: convert binary → String
+ return new String(((Binary) value).getBytes(), StandardCharsets.UTF_8);
+ }
+ return value;
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
index c5d196fc..8dbceaa5 100644
---
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -32,6 +33,9 @@ import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
@@ -82,6 +86,20 @@ public class TestParquetSchemaExtractor {
Assertions.assertEquals(decimalType,
schemaExtractor.toInternalSchema(decimalPrimitive, null));
+ // test fixed size byte array
+ Map<InternalSchema.MetadataKey, Object> fixedMetadata =
+ Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE,
16);
+ InternalSchema fixedType =
+ InternalSchema.builder()
+ .name("fixed")
+ .dataType(InternalType.FIXED)
+ .isNullable(false)
+ .metadata(fixedMetadata)
+ .build();
+ Type fixedPrimitiveType =
+
Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(16).named("fixed");
+ Assertions.assertEquals(fixedType,
schemaExtractor.toInternalSchema(fixedPrimitiveType, null));
+
// tests for timestamp and date
InternalSchema testDate =
InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build();
@@ -124,12 +142,18 @@ public class TestParquetSchemaExtractor {
Types.required(PrimitiveTypeName.INT64)
.as(LogicalTypeAnnotation.timestampType(false,
LogicalTypeAnnotation.TimeUnit.MILLIS))
.named("timestamp_millis");
+ Type timestampMicrosPrimitiveType =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("timestamp_micros");
Type timestampNanosPrimitiveType =
Types.required(PrimitiveTypeName.INT64)
.as(LogicalTypeAnnotation.timestampType(false,
LogicalTypeAnnotation.TimeUnit.NANOS))
.named("timestamp_nanos");
Assertions.assertEquals(
testTimestampMillis,
schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null));
+ Assertions.assertEquals(
+ testTimestampMicros,
schemaExtractor.toInternalSchema(timestampMicrosPrimitiveType, null));
Assertions.assertEquals(
testTimestampNanos,
schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null));
@@ -142,9 +166,7 @@ public class TestParquetSchemaExtractor {
@Test
public void testGroupTypes() {
-
// map
-
InternalSchema internalMap =
InternalSchema.builder()
.name("map")
@@ -153,8 +175,8 @@ public class TestParquetSchemaExtractor {
.fields(
Arrays.asList(
InternalField.builder()
- .name("key")
- .parentPath("_one_field_value")
+ .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+ .parentPath(null)
.schema(
InternalSchema.builder()
.name("key")
@@ -164,8 +186,8 @@ public class TestParquetSchemaExtractor {
.defaultValue(null)
.build(),
InternalField.builder()
- .name("value")
- .parentPath("_one_field_value")
+ .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+ .parentPath(null)
.schema(
InternalSchema.builder()
.name("value")
@@ -278,6 +300,40 @@ public class TestParquetSchemaExtractor {
.isNullable(false)
.fields(
Arrays.asList(
+ InternalField.builder()
+ .name("map")
+ .schema(
+ InternalSchema.builder()
+ .name("map")
+ .isNullable(false)
+ .dataType(InternalType.MAP)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+
.name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+ .parentPath("map")
+ .schema(
+ InternalSchema.builder()
+ .name("key")
+
.dataType(InternalType.FLOAT)
+ .isNullable(false)
+ .build())
+ .defaultValue(null)
+ .build(),
+ InternalField.builder()
+ .name(
+ InternalField.Constants
+ .MAP_VALUE_FIELD_NAME) //
"value")
+ .parentPath("map")
+ .schema(
+ InternalSchema.builder()
+ .name("value")
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build()))
+ .build())
+ .build(),
InternalField.builder()
.name("my_list")
.schema(
@@ -294,7 +350,7 @@ public class TestParquetSchemaExtractor {
InternalSchema.builder()
.name("element")
.dataType(InternalType.INT)
- .isNullable(true)
+ .isNullable(false)
.build())
.build()))
.build())
@@ -331,7 +387,7 @@ public class TestParquetSchemaExtractor {
.named("my_list");
MessageType messageType =
Types.buildMessage()
- // .addField(testMap)
+ .addField(testMap)
.addField(listType)
.addField(testGroupType)
.named("my_record");
@@ -339,4 +395,66 @@ public class TestParquetSchemaExtractor {
Assertions.assertEquals(internalMap,
schemaExtractor.toInternalSchema(testMap, null));
Assertions.assertEquals(internalSchema,
schemaExtractor.toInternalSchema(messageType, null));
}
+
+ static Stream<Arguments> listEncodings() {
+ return Stream.of(
+ Arguments.of(
+ "3-level required element",
+ Types.requiredList()
+
.element(Types.required(PrimitiveTypeName.INT32).named("element"))
+ .named("my_list"),
+ false),
+ Arguments.of(
+ "3-level optional element",
+ Types.requiredList()
+
.element(Types.optional(PrimitiveTypeName.INT32).named("element"))
+ .named("my_list"),
+ true),
+ Arguments.of(
+ "2-level required element",
+ Types.buildGroup(Type.Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.listType())
+ .addField(
+ Types.primitive(PrimitiveTypeName.INT32,
Type.Repetition.REQUIRED)
+ .named("element"))
+ .named("my_list"),
+ false),
+ Arguments.of(
+ "2-level optional element",
+ Types.buildGroup(Type.Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.listType())
+ .addField(
+ Types.primitive(PrimitiveTypeName.INT32,
Type.Repetition.OPTIONAL)
+ .named("element"))
+ .named("my_list"),
+ true));
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("listEncodings")
+ void testListEncoding(String description, GroupType parquetSchema, boolean
elementNullable) {
+ InternalSchema expected = listSchema(elementNullable);
+ Assertions.assertEquals(expected,
schemaExtractor.toInternalSchema(parquetSchema, null));
+ }
+
+ /** Builds the expected InternalSchema for a required LIST of INT32
elements. */
+ private static InternalSchema listSchema(boolean elementNullable) {
+ return InternalSchema.builder()
+ .name("my_list")
+ .dataType(InternalType.LIST)
+ .isNullable(false)
+ .fields(
+ Collections.singletonList(
+ InternalField.builder()
+ .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .parentPath(null)
+ .schema(
+ InternalSchema.builder()
+ .name("element")
+ .dataType(InternalType.INT)
+ .isNullable(elementNullable)
+ .build())
+ .build()))
+ .build();
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
index 5047827f..75c7f01c 100644
---
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
@@ -20,436 +20,549 @@ package org.apache.xtable.parquet;
import static org.apache.parquet.column.Encoding.BIT_PACKED;
import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import java.io.File;
import java.io.IOException;
+import java.math.BigDecimal;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.statistics.BinaryStatistics;
-import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
-import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.Type.Repetition;
-import org.junit.jupiter.api.Assertions;
+import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
-import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.InternalDataFile;
-public class TestParquetStatsExtractor {
-
- private static final ParquetSchemaExtractor schemaExtractor =
- ParquetSchemaExtractor.getInstance();
- private static final ParquetStatsExtractor statsExtractor =
ParquetStatsExtractor.getInstance();
+class TestParquetStatsExtractor {
+ private final Configuration conf = new Configuration();
@TempDir static java.nio.file.Path tempDir = Paths.get("./");
- public static List<ColumnStat> initBooleanFileTest(File file) throws
IOException {
- // create the parquet file by parsing a schema
+ /** Write a two-row single-column Parquet file; Parquet computes min/max
automatically. */
+ private Path writeParquetFile(File file, MessageType schema, Object minVal,
Object maxVal)
+ throws IOException {
Path path = new Path(file.toURI());
- Configuration configuration = new Configuration();
-
- MessageType schema =
- MessageTypeParser.parseMessageType("message m { required group a
{required boolean b;}}");
- String[] columnPath = {"a", "b"};
- ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
- CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
- BooleanStatistics stats = new BooleanStatistics();
- stats.updateStats(true);
- stats.updateStats(false);
-
- // write the string columned file
-
- ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
- w.start();
- w.startBlock(3);
- w.startColumn(c1, 5, codec);
- w.writeDataPage(2, 4, BytesInput.fromInt(1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
- w.writeDataPage(3, 4, BytesInput.fromInt(0), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
- w.endColumn();
- w.endBlock();
- w.startBlock(4);
- w.startColumn(c1, 8, codec);
-
- w.writeDataPage(7, 4, BytesInput.fromInt(0), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
- w.endColumn();
- w.endBlock();
- w.end(new HashMap<String, String>());
-
- // reconstruct the stats for the InternalDataFile testing object
-
- boolean minStat = stats.genericGetMin();
-
- boolean maxStat = stats.genericGetMax();
- PrimitiveType primitiveType =
- new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BOOLEAN, "b");
- List<Integer> col1NumValTotSize =
- new ArrayList<>(Arrays.asList(5, 8)); // (5, 8)// start column indexes
- List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
- List<ColumnStat> testColumnStats = new ArrayList<>();
- String[] columnDotPath = {"a.b", "a.b"};
- for (int i = 0; i < columnDotPath.length; i++) {
- testColumnStats.add(
- ColumnStat.builder()
- .field(
- InternalField.builder()
- .name(primitiveType.getName())
- .parentPath(null)
- .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
- .build())
- .numValues(col1NumValTotSize.get(i))
- .totalSize(col2NumValTotSize.get(i))
- .range(Range.vector(minStat, maxStat))
- .build());
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ String fieldName = schema.getFields().get(0).getName();
+ try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+ writer.write(addValue(factory.newGroup(), fieldName, minVal));
+ writer.write(addValue(factory.newGroup(), fieldName, maxVal));
}
-
- return testColumnStats;
+ return path;
}
- public static List<ColumnStat> initStringFileTest(File file) throws
IOException {
- // create the parquet file by parsing a schema
- Path path = new Path(file.toURI());
- Configuration configuration = new Configuration();
-
- MessageType schema =
- MessageTypeParser.parseMessageType(
- "message m { required group a {required fixed_len_byte_array(10)
b;}}");
- String[] columnPath = {"a", "b"};
- ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
- CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
- BinaryStatistics stats = new BinaryStatistics();
- stats.updateStats(Binary.fromString("1"));
- stats.updateStats(Binary.fromString("2"));
- stats.updateStats(Binary.fromString("5"));
-
- byte[] bytes1 = "First string".getBytes();
- byte[] bytes2 = "Second string".getBytes();
-
- // write the string columned file
-
- ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
- w.start();
- w.startBlock(3);
- w.startColumn(c1, 5, codec);
-
- w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
- w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
- w.endColumn();
- w.endBlock();
- w.startBlock(4);
- w.startColumn(c1, 8, codec);
-
- w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
- w.endColumn();
- w.endBlock();
- w.end(new HashMap<String, String>());
-
- // reconstruct the stats for the InternalDataFile testing object
- BinaryStatistics stats_clone = new BinaryStatistics();
- Binary originalBinary1 = Binary.fromString("1");
- Binary originalBinary2 = Binary.fromString("2");
- Binary originalBinary5 = Binary.fromString("5");
- stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
- stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
- stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
-
- Binary minStat = stats_clone.genericGetMin();
-
- Binary maxStat = stats_clone.genericGetMax();
- PrimitiveType primitiveType =
- new PrimitiveType(Repetition.REQUIRED,
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, "b");
- List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 8));
- List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(71, 36));
- List<ColumnStat> testColumnStats = new ArrayList<>();
- String[] columnDotPath = {"a.b", "a.b"};
- for (int i = 0; i < columnDotPath.length; i++) {
- testColumnStats.add(
- ColumnStat.builder()
- .field(
- InternalField.builder()
- .name(primitiveType.getName())
- .parentPath(null)
- .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
- .build())
- .numValues(col1NumValTotSize.get(i))
- .totalSize(col2NumValTotSize.get(i))
- .range(Range.vector(minStat, maxStat))
- .build());
+ private static Group addValue(Group group, String fieldName, Object value) {
+ if (value instanceof Integer) {
+ group.add(fieldName, (Integer) value);
+ } else if (value instanceof Long) {
+ group.add(fieldName, (Long) value);
+ } else if (value instanceof Float) {
+ group.add(fieldName, (Float) value);
+ } else if (value instanceof Double) {
+ group.add(fieldName, (Double) value);
+ } else if (value instanceof Boolean) {
+ group.add(fieldName, (Boolean) value);
+ } else if (value instanceof Binary) {
+ group.add(fieldName, (Binary) value);
+ } else {
+ throw new IllegalArgumentException("Unsupported: " + value.getClass());
}
+ return group;
+ }
- return testColumnStats;
+ /** Read all column stats from a written file. */
+ private List<ColumnStat> readStats(Path path) {
+ ParquetMetadata footer =
ParquetMetadataExtractor.getInstance().readParquetMetadata(conf, path);
+ return ParquetStatsExtractor.getStatsForFile(
+ footer,
+ ParquetSchemaExtractor.getInstance()
+ .toInternalSchema(footer.getFileMetaData().getSchema(), ""));
}
- public static List<ColumnStat> initBinaryFileTest(File file) throws
IOException {
- // create the parquet file by parsing a schema
- Path path = new Path(file.toURI());
- Configuration configuration = new Configuration();
+ private static IntStatistics intStats(int min, int max) {
+ IntStatistics s = new IntStatistics();
+ s.updateStats(min);
+ s.updateStats(max);
+ return s;
+ }
- MessageType schema =
- MessageTypeParser.parseMessageType("message m { required group a
{required binary b;}}");
- String[] columnPath = {"a", "b"};
- ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
-
- byte[] bytes1 = {0, 1, 2, 3};
- byte[] bytes2 = {2, 3, 4, 5};
- CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
- BinaryStatistics stats = new BinaryStatistics();
- stats.updateStats(Binary.fromString("1"));
- stats.updateStats(Binary.fromString("2"));
- stats.updateStats(Binary.fromString("5"));
-
- // to simplify the test we keep the same stats for both columns
- ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
- w.start();
- w.startBlock(3);
- w.startColumn(c1, 5, codec);
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("primitiveTypeTestCases")
+ void testPrimitiveTypeStats(
+ String name,
+ MessageType schema,
+ Object minWrite,
+ Object maxWrite,
+ InternalType expectedType,
+ Object expectedMin,
+ Object expectedMax)
+ throws IOException {
+ File file = tempDir.resolve("parquet-test-" + name).toFile();
+ Path path = writeParquetFile(file, schema, minWrite, maxWrite);
+ List<ColumnStat> columnStats = readStats(path);
+ assertEquals(1, columnStats.size());
+ ColumnStat stat = columnStats.get(0);
+ assertEquals(expectedType, stat.getField().getSchema().getDataType());
+ assertEquals(expectedMin, stat.getRange().getMinValue());
+ assertEquals(expectedMax, stat.getRange().getMaxValue());
+ }
- w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ static Stream<Arguments> primitiveTypeTestCases() {
+ return Stream.of(
+ // BOOLEAN
+ Arguments.of(
+ "boolean",
+ new MessageType("m",
Types.required(PrimitiveTypeName.BOOLEAN).named("b")),
+ true, // minWrite
+ false, // maxWrite — Parquet stores {false, true} as {min=false,
max=true}
+ InternalType.BOOLEAN,
+ false,
+ true),
+
+ // INT32 plain (no logical type) → INT
+ Arguments.of(
+ "int32-plain",
+ new MessageType("m",
Types.required(PrimitiveTypeName.INT32).named("b")),
+ 1,
+ 100,
+ InternalType.INT,
+ 1,
+ 100),
+
+ // INT64 plain (no logical type) → LONG
+ Arguments.of(
+ "int64-plain",
+ new MessageType("message",
Types.required(PrimitiveTypeName.INT64).named("field1")),
+ 100L,
+ 500L,
+ InternalType.LONG,
+ 100L,
+ 500L),
+
+ // INT64 + Timestamp UTC MICROS → TIMESTAMP
+ Arguments.of(
+ "int64-timestamp-utc-micros",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("field1")),
+ 1_000_000L,
+ 2_000_000L,
+ InternalType.TIMESTAMP,
+ 1_000_000L,
+ 2_000_000L),
+
+ // INT64 + Timestamp UTC MILLIS → TIMESTAMP
+ Arguments.of(
+ "int64-timestamp-utc-millis",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("field1")),
+ 1000L,
+ 2000L,
+ InternalType.TIMESTAMP,
+ 1000L,
+ 2000L),
+
+ // INT64 + Timestamp UTC NANOS → TIMESTAMP
+ Arguments.of(
+ "int64-timestamp-utc-nanos",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("field1")),
+ 1_000_000_000L,
+ 2_000_000_000L,
+ InternalType.TIMESTAMP,
+ 1_000_000_000L,
+ 2_000_000_000L),
+
+ // INT64 + Timestamp NTZ MICROS → TIMESTAMP_NTZ
+ Arguments.of(
+ "int64-timestamp-ntz-micros",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ false, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("field1")),
+ 1_000_000L,
+ 2_000_000L,
+ InternalType.TIMESTAMP_NTZ,
+ 1_000_000L,
+ 2_000_000L),
+
+ // DOUBLE
+ Arguments.of(
+ "double",
+ new MessageType("message",
Types.required(PrimitiveTypeName.DOUBLE).named("field1")),
+ 1.5,
+ 9.9,
+ InternalType.DOUBLE,
+ 1.5,
+ 9.9),
+
+ // FLOAT
+ Arguments.of(
+ "float",
+ new MessageType("message",
Types.required(PrimitiveTypeName.FLOAT).named("field1")),
+ 1.0f,
+ 3.14f,
+ InternalType.FLOAT,
+ 1.0f,
+ 3.14f),
+
+ // INT32 + Date → DATE
+ Arguments.of(
+ "int32-date",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.dateType())
+ .named("field1")),
+ 18000,
+ 19000,
+ InternalType.DATE,
+ 18000,
+ 19000),
+
+ // INT32 + Time MILLIS → INT
+ Arguments.of(
+ "int32-time-millis",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.timeType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("field1")),
+ 0,
+ 86_400_000,
+ InternalType.INT,
+ 0,
+ 86_400_000),
+
+ // BINARY plain (no logical type) → BYTES; range stays Binary
+ Arguments.of(
+ "binary-plain",
+ new MessageType("message",
Types.required(PrimitiveTypeName.BINARY).named("field1")),
+ Binary.fromString("aaa"),
+ Binary.fromString("zzz"),
+ InternalType.BYTES,
+ Binary.fromString("aaa"),
+ Binary.fromString("zzz")),
+
+ // BINARY + STRING → STRING; range values are Java String
+ Arguments.of(
+ "binary-string",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .named("field1")),
+ Binary.fromString("apple"),
+ Binary.fromString("zebra"),
+ InternalType.STRING,
+ "apple",
+ "zebra"),
+
+ // BINARY + JSON → BYTES; range stays Binary
+ Arguments.of(
+ "binary-json",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.jsonType())
+ .named("field1")),
+ Binary.fromString("{\"a\":1}"),
+ Binary.fromString("{\"z\":99}"),
+ InternalType.BYTES,
+ Binary.fromString("{\"a\":1}"),
+ Binary.fromString("{\"z\":99}")),
+
+ // FIXED_LEN_BYTE_ARRAY(16) + UUID → UUID; range stays Binary
+ Arguments.of(
+ "fixed-uuid",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .as(LogicalTypeAnnotation.uuidType())
+ .named("field1")),
+ Binary.fromConstantByteArray(new byte[16]),
+ Binary.fromConstantByteArray(
+ new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}),
+ InternalType.UUID,
+ Binary.fromConstantByteArray(new byte[16]),
+ Binary.fromConstantByteArray(
+ new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1})),
+
+ // FIXED_LEN_BYTE_ARRAY(8) → BYTES; range stays Binary
+ Arguments.of(
+ "fixed-bytes",
+ new MessageType(
+ "message",
+
Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(8).named("field1")),
+ Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 0, 0, 1}),
+ Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 1, 0, 0}),
+ InternalType.FIXED,
+ Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 0, 0, 1}),
+ Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 1, 0,
0})));
+ }
- w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("decimalTypeTestCases")
+ void testDecimalTypeStats(
+ String name,
+ MessageType schema,
+ Object minWrite,
+ Object maxWrite,
+ int expectedPrecision,
+ int expectedScale,
+ Object expectedMin,
+ Object expectedMax)
+ throws IOException {
+ File file = tempDir.resolve("parquet-test-" + name).toFile();
+ Path path = writeParquetFile(file, schema, minWrite, maxWrite);
+ List<ColumnStat> columnStats = readStats(path);
+ assertEquals(1, columnStats.size());
+ ColumnStat stat = columnStats.get(0);
+ assertEquals(InternalType.DECIMAL,
stat.getField().getSchema().getDataType(), name);
+ Map<InternalSchema.MetadataKey, Object> metadata =
stat.getField().getSchema().getMetadata();
+ assertEquals(
+ expectedPrecision,
+ metadata.get(InternalSchema.MetadataKey.DECIMAL_PRECISION),
+ name + " precision");
+ assertEquals(expectedScale,
metadata.get(InternalSchema.MetadataKey.DECIMAL_SCALE));
+ assertEquals(expectedMin, stat.getRange().getMinValue());
+ assertEquals(expectedMax, stat.getRange().getMaxValue());
+ }
- w.endColumn();
- w.endBlock();
- w.startBlock(4);
+ static Stream<Arguments> decimalTypeTestCases() {
+ // Note: LogicalTypeAnnotation.decimalType(scale, precision) — scale is
first argument
+ return Stream.of(
+ // INT32 + Decimal(precision=9, scale=2) — INT32 supports precision up
to 9
+ Arguments.of(
+ "int32-decimal",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.decimalType(2, 9))
+ .named("decimal_field")),
+ 100,
+ 9999,
+ 9,
+ 2,
+ new BigDecimal("1.00"),
+ new BigDecimal("99.99")),
+
+ // INT64 + Decimal(precision=18, scale=6)
+ Arguments.of(
+ "int64-decimal",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.decimalType(6, 18))
+ .named("decimal_field")),
+ 1_000_000L,
+ 9_999_999L,
+ 18,
+ 6,
+ new BigDecimal("1.000000"),
+ new BigDecimal("9.999999")),
+
+ // FIXED_LEN_BYTE_ARRAY(8) + Decimal(precision=18, scale=6); range is
BigDecimal
+ Arguments.of(
+ "fixed-decimal",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(8)
+ .as(LogicalTypeAnnotation.decimalType(6, 18))
+ .named("decimal_field")),
+ Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 0, 0, 1}),
+ Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 1, 0, 0}),
+ 18,
+ 6,
+ new BigDecimal("0.000001"),
+ new BigDecimal("0.065536")),
+
+ // BINARY + Decimal; range is BigDecimal
+ Arguments.of(
+ "binary-decimal",
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.decimalType(4, 20))
+ .named("decimal_field")),
+ Binary.fromConstantByteArray(new byte[] {0, 0, 0, 100}),
+ Binary.fromConstantByteArray(new byte[] {0, 0, 3, (byte) 0xe8}),
+ 20,
+ 4,
+ new BigDecimal("0.0100"),
+ new BigDecimal("0.1000")));
+ }
- w.startColumn(c1, 1, codec);
- w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ @Test
+ void testMultipleColumnsStat() throws IOException {
+ MessageType schema =
+ new MessageType(
+ "message",
+ Types.required(PrimitiveTypeName.INT32).named("col_int"),
+ Types.required(PrimitiveTypeName.DOUBLE).named("col_double"));
- w.endColumn();
- w.endBlock();
- w.end(new HashMap<String, String>());
-
- // reconstruct the stats for the InternalDataFile testing object
- BinaryStatistics stats_clone = new BinaryStatistics();
- Binary originalBinary1 = Binary.fromString("1");
- Binary originalBinary2 = Binary.fromString("2");
- Binary originalBinary5 = Binary.fromString("5");
- stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
- stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
- stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
-
- Binary minStat = stats_clone.genericGetMin();
- Binary maxStat = stats_clone.genericGetMax();
- PrimitiveType primitiveType =
- new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "b");
- List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 1));
- List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
- List<ColumnStat> testColumnStats = new ArrayList<>();
- String[] columnDotPath = {"a.b", "a.b"};
- for (int i = 0; i < columnDotPath.length; i++) {
- testColumnStats.add(
- ColumnStat.builder()
- .field(
- InternalField.builder()
- .name(primitiveType.getName())
- .parentPath(null)
- .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
- .build())
- .numValues(col1NumValTotSize.get(i))
- .totalSize(col2NumValTotSize.get(i))
- .range(Range.vector(minStat, maxStat))
- .build());
+ File file = tempDir.resolve("parquet-test-multi-col").toFile();
+ Path path = new Path(file.toURI());
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+ Group row1 = factory.newGroup();
+ row1.add("col_int", 5);
+ row1.add("col_double", 1.5);
+ Group row2 = factory.newGroup();
+ row2.add("col_int", 50);
+ row2.add("col_double", 9.9);
+ writer.write(row1);
+ writer.write(row2);
}
- return testColumnStats;
+ List<ColumnStat> columnStats = readStats(path);
+ assertEquals(2, columnStats.size());
+
+ ColumnStat intStat =
+ columnStats.stream()
+ .filter(s -> s.getField().getName().equals("col_int"))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("col_int not found"));
+ ColumnStat doubleStat =
+ columnStats.stream()
+ .filter(s -> s.getField().getName().equals("col_double"))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("col_double not found"));
+
+ assertEquals(InternalType.INT,
intStat.getField().getSchema().getDataType());
+ assertEquals(5, intStat.getRange().getMinValue());
+ assertEquals(50, intStat.getRange().getMaxValue());
+
+ assertEquals(InternalType.DOUBLE,
doubleStat.getField().getSchema().getDataType());
+ assertEquals(1.5, doubleStat.getRange().getMinValue());
+ assertEquals(9.9, doubleStat.getRange().getMaxValue());
}
- public static List<ColumnStat> initIntFileTest(File file) throws IOException
{
- // create the parquet file by parsing a schema
- Path path = new Path(file.toURI());
- Configuration configuration = new Configuration();
-
+ @Test
+ void testMultipleRowGroupsStat() throws IOException {
MessageType schema =
- MessageTypeParser.parseMessageType("message m { required group a
{required int32 b;}}");
- String[] columnPath = {"a", "b"};
- ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
-
- CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
-
- IntStatistics stats = new IntStatistics();
- stats.updateStats(1);
- stats.updateStats(2);
- stats.updateStats(5);
+ new MessageType("message",
Types.required(PrimitiveTypeName.INT32).named("field1"));
+ ColumnDescriptor col = schema.getColumns().get(0);
+ IntStatistics stats1 = intStats(10, 100);
+ IntStatistics stats2 = intStats(200, 500);
- // to simplify the test we keep the same stats for both columns
- ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+ File file = tempDir.resolve("parquet-test-multi-rg").toFile();
+ Path path = new Path(file.toURI());
+ ParquetFileWriter w = new ParquetFileWriter(conf, schema, path);
w.start();
- w.startBlock(3);
-
- w.startColumn(c1, 2, codec);
-
- w.writeDataPage(3, 3, BytesInput.fromInt(3), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
-
- w.writeDataPage(3, 3, BytesInput.fromInt(2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.startBlock(10);
+ w.startColumn(col, 10, CompressionCodecName.UNCOMPRESSED);
+ w.writeDataPage(10, 4, BytesInput.fromInt(0), stats1, BIT_PACKED,
BIT_PACKED, PLAIN);
w.endColumn();
w.endBlock();
- w.startBlock(4);
-
- w.startColumn(c1, 1, codec);
-
- w.writeDataPage(3, 3, BytesInput.fromInt(1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.startBlock(5);
+ w.startColumn(col, 5, CompressionCodecName.UNCOMPRESSED);
+ w.writeDataPage(5, 4, BytesInput.fromInt(0), stats2, BIT_PACKED,
BIT_PACKED, PLAIN);
w.endColumn();
w.endBlock();
- w.end(new HashMap<String, String>());
-
- // reconstruct the stats for the InternalDataFile testing object
-
- java.lang.Integer minStat = stats.genericGetMin();
- java.lang.Integer maxStat = stats.genericGetMax();
- PrimitiveType primitiveType =
- new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.INT32, "b");
- List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(2, 1));
- List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
- List<ColumnStat> testColumnStats = new ArrayList<>();
- String[] columnDotPath = {"a.b", "a.b"};
- for (int i = 0; i < columnDotPath.length; i++) {
- testColumnStats.add(
- ColumnStat.builder()
- .field(
- InternalField.builder()
- .name(primitiveType.getName())
- .parentPath(null)
- .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
- .build())
- .numValues(col1NumValTotSize.get(i))
- .totalSize(col2NumValTotSize.get(i))
- .range(Range.vector(minStat, maxStat))
- .build());
- }
-
- return testColumnStats;
+ w.end(new HashMap<>());
+
+ List<ColumnStat> columnStats = readStats(path);
+ assertEquals(1, columnStats.size());
+ ColumnStat columnStat = columnStats.get(0);
+ assertEquals(10, columnStat.getRange().getMinValue());
+ assertEquals(500, columnStat.getRange().getMaxValue());
+ assertEquals(15L, columnStat.getNumValues());
}
@Test
- public void testInternalDataFileStringStat() throws IOException {
-
- Configuration configuration = new Configuration();
-
- java.nio.file.Path path = tempDir.resolve("parquet-test-string-file");
- File file = path.toFile();
- file.deleteOnExit();
- List<ColumnStat> testColumnStats = initStringFileTest(file);
- Path hadoopPath = new Path(file.toURI());
- // statsExtractor toInternalDataFile testing
- InternalDataFile internalDataFile =
- statsExtractor.toInternalDataFile(configuration, hadoopPath);
- InternalDataFile testInternalFile =
- InternalDataFile.builder()
- .physicalPath(
- "file:"
- .concat(
-
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
- .columnStats(testColumnStats)
- .fileFormat(FileFormat.APACHE_PARQUET)
- .lastModified(file.lastModified())
- .fileSizeBytes(file.length())
- .recordCount(8)
- .build();
-
- Assertions.assertEquals(testInternalFile, internalDataFile);
- }
+ void testAllNullColumnStats() throws IOException {
+ MessageType schema =
+ new MessageType("message",
Types.optional(PrimitiveTypeName.INT32).named("col"));
+ File file = tempDir.resolve("parquet-all-null").toFile();
+ Path path = new Path(file.toURI());
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+ writer.write(factory.newGroup());
+ writer.write(factory.newGroup());
+ writer.write(factory.newGroup());
+ }
- @Test
- public void testInternalDataFileBinaryStat() throws IOException {
-
- Configuration configuration = new Configuration();
-
- java.nio.file.Path path = tempDir.resolve("parquet-test-binary-file");
- File file = path.toFile();
- file.deleteOnExit();
- List<ColumnStat> testColumnStats = initBinaryFileTest(file);
- Path hadoopPath = new Path(file.toURI());
- // statsExtractor toInternalDataFile testing
- InternalDataFile internalDataFile =
- statsExtractor.toInternalDataFile(configuration, hadoopPath);
- InternalDataFile testInternalFile =
- InternalDataFile.builder()
- .physicalPath(
- "file:"
- .concat(
-
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
- .columnStats(testColumnStats)
- .fileFormat(FileFormat.APACHE_PARQUET)
- .lastModified(file.lastModified())
- .fileSizeBytes(file.length())
- .recordCount(5)
- .build();
-
- Assertions.assertEquals(testInternalFile, internalDataFile);
+ List<ColumnStat> columnStats = readStats(path);
+ assertEquals(1, columnStats.size());
+ ColumnStat stat = columnStats.get(0);
+ assertEquals(3L, stat.getNumNulls());
+ assertNull(stat.getRange().getMinValue());
+ assertNull(stat.getRange().getMaxValue());
}
@Test
- public void testInternalDataFileIntStat() throws IOException {
-
- Configuration configuration = new Configuration();
- java.nio.file.Path path = tempDir.resolve("parquet-test-int-file");
- File file = path.toFile();
- file.deleteOnExit();
- List<ColumnStat> testColumnStats = initIntFileTest(file);
- Path hadoopPath = new Path(file.toURI());
- // statsExtractor toInternalDataFile testing
- InternalDataFile internalDataFile =
- statsExtractor.toInternalDataFile(configuration, hadoopPath);
- InternalDataFile testInternalFile =
- InternalDataFile.builder()
- .physicalPath(
- "file:"
- .concat(
-
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
- .columnStats(testColumnStats)
- .fileFormat(FileFormat.APACHE_PARQUET)
- .lastModified(file.lastModified())
- .fileSizeBytes(file.length())
- .recordCount(2)
- .build();
-
- Assertions.assertEquals(testInternalFile, internalDataFile);
- }
+ void testNumNullsIsSet() throws IOException {
+ MessageType schema =
+ new MessageType("message",
Types.optional(PrimitiveTypeName.INT32).named("col"));
+ File file = tempDir.resolve("parquet-partial-null").toFile();
+ Path path = new Path(file.toURI());
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+ Group row1 = factory.newGroup();
+ row1.add("col", 10);
+ writer.write(row1);
+ writer.write(factory.newGroup()); // null row
+ Group row3 = factory.newGroup();
+ row3.add("col", 50);
+ writer.write(row3);
+ }
- @Test
- public void testInternalDataFileBooleanStat() throws IOException {
- Configuration configuration = new Configuration();
-
- java.nio.file.Path path = tempDir.resolve("parquet-test-boolean-file");
- File file = path.toFile();
- file.deleteOnExit();
-
- List<ColumnStat> testColumnStats = initBooleanFileTest(file);
- Path hadoopPath = new Path(file.toURI());
- // statsExtractor toInternalDataFile testing
- InternalDataFile internalDataFile =
- statsExtractor.toInternalDataFile(configuration, hadoopPath);
- InternalDataFile testInternalFile =
- InternalDataFile.builder()
- .physicalPath(
- "file:"
- .concat(
-
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
- .columnStats(testColumnStats)
- .fileFormat(FileFormat.APACHE_PARQUET)
- .lastModified(file.lastModified())
- .fileSizeBytes(file.length())
- .recordCount(8)
- .build();
-
- Assertions.assertEquals(testInternalFile, internalDataFile);
+ List<ColumnStat> columnStats = readStats(path);
+ assertEquals(1, columnStats.size());
+ ColumnStat stat = columnStats.get(0);
+ assertEquals(1L, stat.getNumNulls());
+ assertEquals(10, stat.getRange().getMinValue());
+ assertEquals(50, stat.getRange().getMaxValue());
}
}