This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 43075655 Parquet source: Expand column stats support, fix bugs in 
schema conversion (#805)
43075655 is described below

commit 430756556a24e3b3714477d33a74d1cc21b80f45
Author: Tim Brown <[email protected]>
AuthorDate: Mon Feb 23 10:55:29 2026 -0800

    Parquet source: Expand column stats support, fix bugs in schema conversion 
(#805)
    
    * fix stats extraction to be per file and handle more types
    
    * fix list and map handling
    
    * cleanup
    
    * spotless
    
    * address comments, set null count, handle null stats
    
    * remove more unused variables
---
 .../xtable/parquet/ParquetConversionSource.java    |  36 +-
 .../xtable/parquet/ParquetMetadataExtractor.java   |  24 +-
 .../parquet/ParquetPartitionValueExtractor.java    |   9 -
 .../xtable/parquet/ParquetSchemaExtractor.java     |  65 +-
 .../xtable/parquet/ParquetStatsConverterUtil.java  |  69 --
 .../xtable/parquet/ParquetStatsExtractor.java      | 182 ++---
 .../xtable/parquet/TestParquetSchemaExtractor.java | 134 +++-
 .../xtable/parquet/TestParquetStatsExtractor.java  | 859 ++++++++++++---------
 8 files changed, 755 insertions(+), 623 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
index e5625be7..61a2ba8f 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
@@ -110,7 +110,8 @@ public class ParquetConversionSource implements 
ConversionSource<Long> {
     return createInternalTableFromFile(file);
   }
 
-  private Stream<InternalDataFile> 
getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) {
+  private Stream<InternalDataFile> getInternalDataFiles(
+      Stream<LocatedFileStatus> parquetFiles, InternalSchema schema) {
     return parquetFiles.map(
         file ->
             InternalDataFile.builder()
@@ -119,35 +120,29 @@ public class ParquetConversionSource implements 
ConversionSource<Long> {
                 .fileSizeBytes(file.getLen())
                 .partitionValues(
                     partitionValueExtractor.extractPartitionValues(
-                        partitionSpecExtractor.spec(
-                            
partitionValueExtractor.extractSchemaForParquetPartitions(
-                                parquetMetadataExtractor.readParquetMetadata(
-                                    hadoopConf, file.getPath()),
-                                file.getPath().toString())),
+                        partitionSpecExtractor.spec(schema),
                         HudiPathUtils.getPartitionPath(new Path(basePath), 
file.getPath())))
                 .lastModified(file.getModificationTime())
                 .columnStats(
-                    parquetStatsExtractor.getColumnStatsForaFile(
-                        
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath())))
+                    parquetStatsExtractor.getStatsForFile(
+                        
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()),
+                        schema))
                 .build());
   }
 
-  private InternalDataFile createInternalDataFileFromParquetFile(FileStatus 
parquetFile) {
+  private InternalDataFile createInternalDataFileFromParquetFile(
+      FileStatus parquetFile, InternalSchema schema) {
     return InternalDataFile.builder()
         .physicalPath(parquetFile.getPath().toString())
         .partitionValues(
             partitionValueExtractor.extractPartitionValues(
-                partitionSpecExtractor.spec(
-                    partitionValueExtractor.extractSchemaForParquetPartitions(
-                        parquetMetadataExtractor.readParquetMetadata(
-                            hadoopConf, parquetFile.getPath()),
-                        parquetFile.getPath().toString())),
-                basePath))
+                partitionSpecExtractor.spec(schema), basePath))
         .lastModified(parquetFile.getModificationTime())
         .fileSizeBytes(parquetFile.getLen())
         .columnStats(
-            parquetStatsExtractor.getColumnStatsForaFile(
-                parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parquetFile.getPath())))
+            parquetStatsExtractor.getStatsForFile(
+                parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parquetFile.getPath()),
+                schema))
         .build();
   }
 
@@ -169,7 +164,8 @@ public class ParquetConversionSource implements 
ConversionSource<Long> {
             .collect(Collectors.toList());
     InternalTable internalTable = getMostRecentTable(parquetFiles);
     for (FileStatus tableStatus : tableChangesAfter) {
-      InternalDataFile currentDataFile = 
createInternalDataFileFromParquetFile(tableStatus);
+      InternalDataFile currentDataFile =
+          createInternalDataFileFromParquetFile(tableStatus, 
internalTable.getReadSchema());
       addedInternalDataFiles.add(currentDataFile);
     }
 
@@ -198,9 +194,9 @@ public class ParquetConversionSource implements 
ConversionSource<Long> {
   @Override
   public InternalSnapshot getCurrentSnapshot() {
     // to avoid consume the stream call the method twice to return the same 
stream of parquet files
-    Stream<InternalDataFile> internalDataFiles =
-        getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
     InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, 
basePath));
+    Stream<InternalDataFile> internalDataFiles =
+        getInternalDataFiles(getParquetFiles(hadoopConf, basePath), 
table.getReadSchema());
     return InternalSnapshot.builder()
         .table(table)
         .sourceIdentifier(
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
index f29a186d..f022b753 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
@@ -20,6 +20,9 @@ package org.apache.xtable.parquet;
 
 import java.io.IOException;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.HadoopReadOptions;
@@ -27,11 +30,11 @@ import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.InputFile;
 import org.apache.parquet.schema.MessageType;
 
 import org.apache.xtable.exception.ReadException;
 
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class ParquetMetadataExtractor {
 
   private static final ParquetMetadataExtractor INSTANCE = new 
ParquetMetadataExtractor();
@@ -40,23 +43,16 @@ public class ParquetMetadataExtractor {
     return INSTANCE;
   }
 
-  public static MessageType getSchema(ParquetMetadata footer) {
-    MessageType schema = footer.getFileMetaData().getSchema();
-    return schema;
+  public MessageType getSchema(ParquetMetadata footer) {
+    return footer.getFileMetaData().getSchema();
   }
 
-  public static ParquetMetadata readParquetMetadata(Configuration conf, Path 
filePath) {
-    InputFile file = null;
-    try {
-      file = HadoopInputFile.fromPath(filePath, conf);
-    } catch (IOException e) {
-      throw new ReadException("Failed to read the parquet file", e);
-    }
-
+  public ParquetMetadata readParquetMetadata(Configuration conf, Path 
filePath) {
     ParquetReadOptions options = HadoopReadOptions.builder(conf, 
filePath).build();
-    try (ParquetFileReader fileReader = ParquetFileReader.open(file, options)) 
{
+    try (ParquetFileReader fileReader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(filePath, conf), 
options)) {
       return fileReader.getFooter();
-    } catch (Exception e) {
+    } catch (IOException e) {
       throw new ReadException("Failed to read the parquet file", e);
     }
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
index 63b35846..19c2f705 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
@@ -28,13 +28,9 @@ import java.util.TimeZone;
 
 import lombok.NonNull;
 
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.MessageType;
-
 import org.apache.xtable.exception.PartitionValuesExtractorException;
 import org.apache.xtable.hudi.PathBasedPartitionValuesExtractor;
 import org.apache.xtable.model.schema.InternalPartitionField;
-import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.stat.PartitionValue;
 import org.apache.xtable.model.stat.Range;
 
@@ -134,9 +130,4 @@ public class ParquetPartitionValueExtractor extends 
PathBasedPartitionValuesExtr
   public static ParquetPartitionValueExtractor getInstance() {
     return INSTANCE;
   }
-
-  public InternalSchema extractSchemaForParquetPartitions(ParquetMetadata 
footer, String path) {
-    MessageType parquetSchema = parquetMetadataExtractor.getSchema(footer);
-    return schemaExtractor.toInternalSchema(parquetSchema, path);
-  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
index ed6cf284..48144287 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
@@ -61,12 +61,9 @@ public class ParquetSchemaExtractor {
                   .dataType(InternalType.STRING)
                   .isNullable(false)
                   .build())
-          .defaultValue("")
           .build();
   private static final ParquetSchemaExtractor INSTANCE = new 
ParquetSchemaExtractor();
-  private static final String ELEMENT = "element";
-  private static final String KEY = "key";
-  private static final String VALUE = "value";
+  private static final String LIST = "list";
 
   public static ParquetSchemaExtractor getInstance() {
     return INSTANCE;
@@ -86,7 +83,6 @@ public class ParquetSchemaExtractor {
    */
   public InternalSchema toInternalSchema(Type schema, String parentPath) {
     InternalType newDataType = null;
-    Type.Repetition currentRepetition = null;
     List<InternalField> subFields = null;
     PrimitiveType primitiveType;
     LogicalTypeAnnotation logicalType;
@@ -195,6 +191,10 @@ public class ParquetSchemaExtractor {
                 InternalSchema.MetadataKey.DECIMAL_SCALE,
                 ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getScale());
             newDataType = InternalType.DECIMAL;
+          } else {
+            newDataType = InternalType.FIXED;
+            metadata.put(
+                InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 
primitiveType.getTypeLength());
           }
           break;
         case BINARY:
@@ -233,11 +233,18 @@ public class ParquetSchemaExtractor {
       // GroupTypes
       logicalType = schema.getLogicalTypeAnnotation();
       if (logicalType instanceof 
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
-        String schemaName = schema.asGroupType().getName();
-        Type.ID schemaId = schema.getId();
+        Type elementType;
+        // check if 3-level or 2-level list encoding
+        if (schema.asGroupType().getFieldCount() == 1
+            && !schema.asGroupType().getType(0).isPrimitive()
+            && 
schema.asGroupType().getType(0).asGroupType().getName().equals(LIST)) {
+          elementType = 
schema.asGroupType().getType(0).asGroupType().getType(0);
+        } else {
+          elementType = schema.asGroupType().getType(0);
+        }
         InternalSchema elementSchema =
             toInternalSchema(
-                schema.asGroupType().getType(0),
+                elementType,
                 SchemaUtils.getFullyQualifiedPath(
                     parentPath, 
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME));
         InternalField elementField =
@@ -245,7 +252,7 @@ public class ParquetSchemaExtractor {
                 .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
                 .parentPath(parentPath)
                 .schema(elementSchema)
-                .fieldId(schemaId == null ? null : schemaId.intValue())
+                .fieldId(elementType.getId() == null ? null : 
elementType.getId().intValue())
                 .build();
         return InternalSchema.builder()
             .name(schema.getName())
@@ -256,10 +263,25 @@ public class ParquetSchemaExtractor {
             .build();
       } else if (logicalType instanceof 
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
         String schemaName = schema.asGroupType().getName();
-        Type.ID schemaId = schema.getId();
+        List<Type> keyAndValueTypes =
+            schema.asGroupType().getFields().get(0).asGroupType().getFields();
+        Type keyType = keyAndValueTypes.get(0);
+        InternalSchema keySchema =
+            toInternalSchema(
+                keyType,
+                SchemaUtils.getFullyQualifiedPath(
+                    parentPath, InternalField.Constants.MAP_KEY_FIELD_NAME));
+        InternalField keyField =
+            MAP_KEY_FIELD.toBuilder()
+                .parentPath(parentPath)
+                .schema(keySchema)
+                .fieldId(keyType.getId() == null ? null : 
keyType.getId().intValue())
+                .build();
+
+        Type valueType = keyAndValueTypes.get(1);
         InternalSchema valueSchema =
             toInternalSchema(
-                schema.asGroupType().getType(0),
+                valueType,
                 SchemaUtils.getFullyQualifiedPath(
                     parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME));
         InternalField valueField =
@@ -267,14 +289,14 @@ public class ParquetSchemaExtractor {
                 .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
                 .parentPath(parentPath)
                 .schema(valueSchema)
-                .fieldId(schemaId == null ? null : schemaId.intValue())
+                .fieldId(valueType.getId() == null ? null : 
valueType.getId().intValue())
                 .build();
         return InternalSchema.builder()
             .name(schemaName)
             .dataType(InternalType.MAP)
             .comment(null)
             .isNullable(isNullable(schema.asGroupType()))
-            .fields(valueSchema.getFields())
+            .fields(Arrays.asList(keyField, valueField))
             .build();
       } else {
         subFields = new ArrayList<>(schema.asGroupType().getFields().size());
@@ -285,13 +307,6 @@ public class ParquetSchemaExtractor {
               toInternalSchema(
                   parquetField, SchemaUtils.getFullyQualifiedPath(parentPath, 
fieldName));
 
-          if (schema.asGroupType().getFields().size()
-              == 1) { // TODO Tuple (many subelements in a list)
-            newDataType = subFieldSchema.getDataType();
-            elementName = subFieldSchema.getName();
-            // subFields = subFieldSchema.getFields();
-            break;
-          }
           subFields.add(
               InternalField.builder()
                   .parentPath(parentPath)
@@ -338,10 +353,6 @@ public class ParquetSchemaExtractor {
    */
   public Type fromInternalSchema(InternalSchema internalSchema, String 
currentPath) {
     Type type = null;
-    Type listType = null;
-    Type mapType = null;
-    Type mapKeyType = null;
-    Type mapValueType = null;
     String fieldName = internalSchema.getName();
     InternalType internalType = internalSchema.getDataType();
     switch (internalType) {
@@ -459,7 +470,7 @@ public class ParquetSchemaExtractor {
                         
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(field.getName()))
                 .findFirst()
                 .orElseThrow(() -> new SchemaExtractorException("Invalid array 
schema"));
-        listType = fromInternalSchema(elementField.getSchema(), 
elementField.getPath());
+        Type listType = fromInternalSchema(elementField.getSchema(), 
elementField.getPath());
         type = 
Types.requiredList().setElementType(listType).named(internalSchema.getName());
         // TODO nullable lists
         break;
@@ -475,8 +486,8 @@ public class ParquetSchemaExtractor {
                     field -> 
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(field.getName()))
                 .findFirst()
                 .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
-        mapKeyType = fromInternalSchema(keyField.getSchema(), 
valueField.getPath());
-        mapValueType = fromInternalSchema(valueField.getSchema(), 
valueField.getPath());
+        Type mapKeyType = fromInternalSchema(keyField.getSchema(), 
valueField.getPath());
+        Type mapValueType = fromInternalSchema(valueField.getSchema(), 
valueField.getPath());
         type =
             
Types.requiredMap().key(mapKeyType).value(mapValueType).named(internalSchema.getName());
         // TODO nullable lists
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
deleted file mode 100644
index e5fd2d07..00000000
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.xtable.parquet;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.PrimitiveType;
-
-public class ParquetStatsConverterUtil {
-  public static Object convertStatBinaryTypeToLogicalType(
-      ColumnChunkMetaData columnMetaData, boolean isMin) {
-    Object returnedObj = null;
-    PrimitiveType primitiveType = columnMetaData.getPrimitiveType();
-    switch (primitiveType.getPrimitiveTypeName()) {
-      case BINARY: // TODO check if other primitiveType' needs to be handled 
as well
-        if (primitiveType.getLogicalTypeAnnotation() != null) {
-          if (columnMetaData
-              .getPrimitiveType()
-              .getLogicalTypeAnnotation()
-              .toString()
-              .equals("STRING")) {
-            returnedObj =
-                new String(
-                    (isMin
-                            ? (Binary) 
columnMetaData.getStatistics().genericGetMin()
-                            : (Binary) 
columnMetaData.getStatistics().genericGetMax())
-                        .getBytes(),
-                    StandardCharsets.UTF_8);
-          } else {
-            returnedObj =
-                isMin
-                    ? columnMetaData.getStatistics().genericGetMin()
-                    : columnMetaData.getStatistics().genericGetMax();
-          }
-        } else {
-          returnedObj =
-              isMin
-                  ? columnMetaData.getStatistics().genericGetMin()
-                  : columnMetaData.getStatistics().genericGetMax();
-        }
-        break;
-      default:
-        returnedObj =
-            isMin
-                ? columnMetaData.getStatistics().genericGetMin()
-                : columnMetaData.getStatistics().genericGetMax();
-        // TODO JSON and DECIMAL... of BINARY primitiveType
-    }
-    return returnedObj;
-  }
-}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
index 46082353..b6f0f330 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
@@ -18,33 +18,30 @@
  
 package org.apache.xtable.parquet;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import lombok.Builder;
 import lombok.Value;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 
-import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
 import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.stat.PartitionValue;
 import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.schema.SchemaFieldFinder;
 
 @Value
 @Builder
@@ -52,9 +49,6 @@ public class ParquetStatsExtractor {
 
   private static final ParquetStatsExtractor INSTANCE = new 
ParquetStatsExtractor();
 
-  private static final ParquetSchemaExtractor schemaExtractor =
-      ParquetSchemaExtractor.getInstance();
-
   private static final ParquetMetadataExtractor parquetMetadataExtractor =
       ParquetMetadataExtractor.getInstance();
 
@@ -62,97 +56,79 @@ public class ParquetStatsExtractor {
     return INSTANCE;
   }
 
-  private static final ParquetPartitionValueExtractor partitionValueExtractor =
-      ParquetPartitionValueExtractor.getInstance();
-  private static PathBasedPartitionSpecExtractor partitionSpecExtractor =
-      ParquetPartitionSpecExtractor.getInstance();
-
-  public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata 
footer) {
-    return getStatsForFile(footer).values().stream()
-        .flatMap(List::stream)
-        .collect(Collectors.toList());
-  }
+  @SuppressWarnings("unchecked")
+  private static final Comparator<Object> COMPARABLE_COMPARATOR =
+      (a, b) -> ((Comparable<Object>) a).compareTo(b);
 
-  private static Optional<Long> getMaxFromColumnStats(List<ColumnStat> 
columnStats) {
-    return columnStats.stream()
-        .filter(entry -> entry.getField().getParentPath() == null)
-        .map(ColumnStat::getNumValues)
-        .filter(numValues -> numValues > 0)
-        .max(Long::compareTo);
+  private static ColumnStat mergeColumnChunks(
+      List<ColumnChunkMetaData> chunks, InternalSchema internalSchema) {
+    ColumnChunkMetaData first = chunks.get(0);
+    String dotStringPath = first.getPath().toDotString();
+    InternalField internalField =
+        SchemaFieldFinder.getInstance().findFieldByPath(internalSchema, 
dotStringPath);
+    Objects.requireNonNull(internalField, "No field found for path: " + 
dotStringPath);
+    PrimitiveType primitiveType = first.getPrimitiveType();
+    long totalNumValues = 
chunks.stream().mapToLong(ColumnChunkMetaData::getValueCount).sum();
+    long totalSize = 
chunks.stream().mapToLong(ColumnChunkMetaData::getTotalSize).sum();
+    long totalNullValues =
+        chunks.stream()
+            .map(ColumnChunkMetaData::getStatistics)
+            .mapToLong(Statistics::getNumNulls)
+            .sum();
+    Object globalMin =
+        chunks.stream()
+            .filter(c -> c.getStatistics().hasNonNullValue())
+            .map(c -> convertStatsToInternalType(primitiveType, 
c.getStatistics().genericGetMin()))
+            .min(COMPARABLE_COMPARATOR)
+            .orElse(null);
+    Object globalMax =
+        chunks.stream()
+            .filter(c -> c.getStatistics().hasNonNullValue())
+            .map(c -> convertStatsToInternalType(primitiveType, 
c.getStatistics().genericGetMax()))
+            .max(COMPARABLE_COMPARATOR)
+            .orElse(null);
+    return ColumnStat.builder()
+        .field(internalField)
+        .numValues(totalNumValues)
+        .numNulls(totalNullValues)
+        .totalSize(totalSize)
+        .range(Range.vector(globalMin, globalMax))
+        .build();
   }
 
-  public static Map<ColumnDescriptor, List<ColumnStat>> 
getStatsForFile(ParquetMetadata footer) {
-    Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>();
+  public static List<ColumnStat> getStatsForFile(
+      ParquetMetadata footer, InternalSchema internalSchema) {
     MessageType schema = parquetMetadataExtractor.getSchema(footer);
-    List<ColumnChunkMetaData> columns = new ArrayList<>();
-    columns =
-        footer.getBlocks().stream()
-            .flatMap(blockMetaData -> blockMetaData.getColumns().stream())
-            .collect(Collectors.toList());
-    columnDescStats =
-        columns.stream()
-            .collect(
-                Collectors.groupingBy(
-                    columnMetaData ->
-                        
schema.getColumnDescription(columnMetaData.getPath().toArray()),
-                    Collectors.mapping(
-                        columnMetaData ->
-                            ColumnStat.builder()
-                                .field(
-                                    InternalField.builder()
-                                        
.name(columnMetaData.getPrimitiveType().getName())
-                                        .fieldId(
-                                            
columnMetaData.getPrimitiveType().getId() == null
-                                                ? null
-                                                : columnMetaData
-                                                    .getPrimitiveType()
-                                                    .getId()
-                                                    .intValue())
-                                        .parentPath(null)
-                                        .schema(
-                                            schemaExtractor.toInternalSchema(
-                                                
columnMetaData.getPrimitiveType(),
-                                                
columnMetaData.getPath().toDotString()))
-                                        .build())
-                                .numValues(columnMetaData.getValueCount())
-                                .totalSize(columnMetaData.getTotalSize())
-                                .range(
-                                    Range.vector(
-                                        ParquetStatsConverterUtil
-                                            
.convertStatBinaryTypeToLogicalType(
-                                                columnMetaData,
-                                                true), // if stats are string 
convert to
-                                        // litteraly a string stat and
-                                        // store to range
-                                        ParquetStatsConverterUtil
-                                            
.convertStatBinaryTypeToLogicalType(
-                                                columnMetaData, false)))
-                                .build(),
-                        Collectors.toList())));
-    return columnDescStats;
+    return footer.getBlocks().stream()
+        .flatMap(block -> block.getColumns().stream())
+        .collect(
+            Collectors.groupingBy(chunk -> 
schema.getColumnDescription(chunk.getPath().toArray())))
+        .values()
+        .stream()
+        .map(columnChunks -> mergeColumnChunks(columnChunks, internalSchema))
+        .collect(Collectors.toList());
   }
 
-  public static InternalDataFile toInternalDataFile(Configuration hadoopConf, 
Path parentPath)
-      throws IOException {
-    FileSystem fs = FileSystem.get(hadoopConf);
-    FileStatus file = fs.getFileStatus(parentPath);
-    ParquetMetadata footer = 
parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath);
-    List<ColumnStat> columnStatsForAFile = getColumnStatsForaFile(footer);
-    List<PartitionValue> partitionValues =
-        partitionValueExtractor.extractPartitionValues(
-            partitionSpecExtractor.spec(
-                partitionValueExtractor.extractSchemaForParquetPartitions(
-                    parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
file.getPath()),
-                    file.getPath().toString())),
-            parentPath.toString());
-    return InternalDataFile.builder()
-        .physicalPath(parentPath.toString())
-        .fileFormat(FileFormat.APACHE_PARQUET)
-        .partitionValues(partitionValues)
-        .fileSizeBytes(file.getLen())
-        .recordCount(getMaxFromColumnStats(columnStatsForAFile).orElse(0L))
-        .columnStats(columnStatsForAFile)
-        .lastModified(file.getModificationTime())
-        .build();
+  private static Object convertStatsToInternalType(PrimitiveType 
primitiveType, Object value) {
+    LogicalTypeAnnotation annotation = 
primitiveType.getLogicalTypeAnnotation();
+
+    // DECIMAL: convert unscaled backing value → BigDecimal regardless of 
primitive type
+    if (annotation instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+      int scale = ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
annotation).getScale();
+      switch (primitiveType.getPrimitiveTypeName()) {
+        case INT32:
+        case INT64:
+          return new BigDecimal(BigInteger.valueOf(((Number) 
value).longValue()), scale);
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+          return new BigDecimal(new BigInteger(((Binary) value).getBytes()), 
scale);
+        default:
+          return value;
+      }
+    } else if (annotation instanceof 
LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+      // STRING: convert binary → String
+      return new String(((Binary) value).getBytes(), StandardCharsets.UTF_8);
+    }
+    return value;
   }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
index c5d196fc..8dbceaa5 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -32,6 +33,9 @@ import org.apache.parquet.schema.Type.Repetition;
 import org.apache.parquet.schema.Types;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalSchema;
@@ -82,6 +86,20 @@ public class TestParquetSchemaExtractor {
 
     Assertions.assertEquals(decimalType, 
schemaExtractor.toInternalSchema(decimalPrimitive, null));
 
+    // test fixed size byte array
+    Map<InternalSchema.MetadataKey, Object> fixedMetadata =
+        Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 
16);
+    InternalSchema fixedType =
+        InternalSchema.builder()
+            .name("fixed")
+            .dataType(InternalType.FIXED)
+            .isNullable(false)
+            .metadata(fixedMetadata)
+            .build();
+    Type fixedPrimitiveType =
+        
Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(16).named("fixed");
+    Assertions.assertEquals(fixedType, 
schemaExtractor.toInternalSchema(fixedPrimitiveType, null));
+
     // tests for timestamp and date
     InternalSchema testDate =
         
InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build();
@@ -124,12 +142,18 @@ public class TestParquetSchemaExtractor {
         Types.required(PrimitiveTypeName.INT64)
             .as(LogicalTypeAnnotation.timestampType(false, 
LogicalTypeAnnotation.TimeUnit.MILLIS))
             .named("timestamp_millis");
+    Type timestampMicrosPrimitiveType =
+        Types.required(PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MICROS))
+            .named("timestamp_micros");
     Type timestampNanosPrimitiveType =
         Types.required(PrimitiveTypeName.INT64)
             .as(LogicalTypeAnnotation.timestampType(false, 
LogicalTypeAnnotation.TimeUnit.NANOS))
             .named("timestamp_nanos");
     Assertions.assertEquals(
         testTimestampMillis, 
schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null));
+    Assertions.assertEquals(
+        testTimestampMicros, 
schemaExtractor.toInternalSchema(timestampMicrosPrimitiveType, null));
     Assertions.assertEquals(
         testTimestampNanos, 
schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null));
 
@@ -142,9 +166,7 @@ public class TestParquetSchemaExtractor {
 
   @Test
   public void testGroupTypes() {
-
     // map
-
     InternalSchema internalMap =
         InternalSchema.builder()
             .name("map")
@@ -153,8 +175,8 @@ public class TestParquetSchemaExtractor {
             .fields(
                 Arrays.asList(
                     InternalField.builder()
-                        .name("key")
-                        .parentPath("_one_field_value")
+                        .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+                        .parentPath(null)
                         .schema(
                             InternalSchema.builder()
                                 .name("key")
@@ -164,8 +186,8 @@ public class TestParquetSchemaExtractor {
                         .defaultValue(null)
                         .build(),
                     InternalField.builder()
-                        .name("value")
-                        .parentPath("_one_field_value")
+                        .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+                        .parentPath(null)
                         .schema(
                             InternalSchema.builder()
                                 .name("value")
@@ -278,6 +300,40 @@ public class TestParquetSchemaExtractor {
             .isNullable(false)
             .fields(
                 Arrays.asList(
+                    InternalField.builder()
+                        .name("map")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("map")
+                                .isNullable(false)
+                                .dataType(InternalType.MAP)
+                                .fields(
+                                    Arrays.asList(
+                                        InternalField.builder()
+                                            
.name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+                                            .parentPath("map")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("key")
+                                                    
.dataType(InternalType.FLOAT)
+                                                    .isNullable(false)
+                                                    .build())
+                                            .defaultValue(null)
+                                            .build(),
+                                        InternalField.builder()
+                                            .name(
+                                                InternalField.Constants
+                                                    .MAP_VALUE_FIELD_NAME) // 
"value")
+                                            .parentPath("map")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("value")
+                                                    .dataType(InternalType.INT)
+                                                    .isNullable(false)
+                                                    .build())
+                                            .build()))
+                                .build())
+                        .build(),
                     InternalField.builder()
                         .name("my_list")
                         .schema(
@@ -294,7 +350,7 @@ public class TestParquetSchemaExtractor {
                                                 InternalSchema.builder()
                                                     .name("element")
                                                     .dataType(InternalType.INT)
-                                                    .isNullable(true)
+                                                    .isNullable(false)
                                                     .build())
                                             .build()))
                                 .build())
@@ -331,7 +387,7 @@ public class TestParquetSchemaExtractor {
             .named("my_list");
     MessageType messageType =
         Types.buildMessage()
-            // .addField(testMap)
+            .addField(testMap)
             .addField(listType)
             .addField(testGroupType)
             .named("my_record");
@@ -339,4 +395,66 @@ public class TestParquetSchemaExtractor {
     Assertions.assertEquals(internalMap, 
schemaExtractor.toInternalSchema(testMap, null));
     Assertions.assertEquals(internalSchema, 
schemaExtractor.toInternalSchema(messageType, null));
   }
+
+  static Stream<Arguments> listEncodings() {
+    return Stream.of(
+        Arguments.of(
+            "3-level required element",
+            Types.requiredList()
+                
.element(Types.required(PrimitiveTypeName.INT32).named("element"))
+                .named("my_list"),
+            false),
+        Arguments.of(
+            "3-level optional element",
+            Types.requiredList()
+                
.element(Types.optional(PrimitiveTypeName.INT32).named("element"))
+                .named("my_list"),
+            true),
+        Arguments.of(
+            "2-level required element",
+            Types.buildGroup(Type.Repetition.REQUIRED)
+                .as(LogicalTypeAnnotation.listType())
+                .addField(
+                    Types.primitive(PrimitiveTypeName.INT32, 
Type.Repetition.REQUIRED)
+                        .named("element"))
+                .named("my_list"),
+            false),
+        Arguments.of(
+            "2-level optional element",
+            Types.buildGroup(Type.Repetition.REQUIRED)
+                .as(LogicalTypeAnnotation.listType())
+                .addField(
+                    Types.primitive(PrimitiveTypeName.INT32, 
Type.Repetition.OPTIONAL)
+                        .named("element"))
+                .named("my_list"),
+            true));
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("listEncodings")
+  void testListEncoding(String description, GroupType parquetSchema, boolean 
elementNullable) {
+    InternalSchema expected = listSchema(elementNullable);
+    Assertions.assertEquals(expected, 
schemaExtractor.toInternalSchema(parquetSchema, null));
+  }
+
+  /** Builds the expected InternalSchema for a required LIST of INT32 
elements. */
+  private static InternalSchema listSchema(boolean elementNullable) {
+    return InternalSchema.builder()
+        .name("my_list")
+        .dataType(InternalType.LIST)
+        .isNullable(false)
+        .fields(
+            Collections.singletonList(
+                InternalField.builder()
+                    .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+                    .parentPath(null)
+                    .schema(
+                        InternalSchema.builder()
+                            .name("element")
+                            .dataType(InternalType.INT)
+                            .isNullable(elementNullable)
+                            .build())
+                    .build()))
+        .build();
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
index 5047827f..75c7f01c 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
@@ -20,436 +20,549 @@ package org.apache.xtable.parquet;
 
 import static org.apache.parquet.column.Encoding.BIT_PACKED;
 import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.statistics.BinaryStatistics;
-import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
-import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.Type.Repetition;
-import org.junit.jupiter.api.Assertions;
+import org.apache.parquet.schema.Types;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
-import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.InternalDataFile;
 
-public class TestParquetStatsExtractor {
-
-  private static final ParquetSchemaExtractor schemaExtractor =
-      ParquetSchemaExtractor.getInstance();
-  private static final ParquetStatsExtractor statsExtractor = 
ParquetStatsExtractor.getInstance();
+class TestParquetStatsExtractor {
 
+  private final Configuration conf = new Configuration();
   @TempDir static java.nio.file.Path tempDir = Paths.get("./");
 
-  public static List<ColumnStat> initBooleanFileTest(File file) throws 
IOException {
-    // create the parquet file by parsing a schema
+  /** Write a two-row single-column Parquet file; Parquet computes min/max 
automatically. */
+  private Path writeParquetFile(File file, MessageType schema, Object minVal, 
Object maxVal)
+      throws IOException {
     Path path = new Path(file.toURI());
-    Configuration configuration = new Configuration();
-
-    MessageType schema =
-        MessageTypeParser.parseMessageType("message m { required group a 
{required boolean b;}}");
-    String[] columnPath = {"a", "b"};
-    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
-    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
-    BooleanStatistics stats = new BooleanStatistics();
-    stats.updateStats(true);
-    stats.updateStats(false);
-
-    // write the string columned file
-
-    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
-    w.start();
-    w.startBlock(3);
-    w.startColumn(c1, 5, codec);
-    w.writeDataPage(2, 4, BytesInput.fromInt(1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
-    w.writeDataPage(3, 4, BytesInput.fromInt(0), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
-    w.endColumn();
-    w.endBlock();
-    w.startBlock(4);
-    w.startColumn(c1, 8, codec);
-
-    w.writeDataPage(7, 4, BytesInput.fromInt(0), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
-    w.endColumn();
-    w.endBlock();
-    w.end(new HashMap<String, String>());
-
-    // reconstruct the stats for the InternalDataFile testing object
-
-    boolean minStat = stats.genericGetMin();
-
-    boolean maxStat = stats.genericGetMax();
-    PrimitiveType primitiveType =
-        new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BOOLEAN, "b");
-    List<Integer> col1NumValTotSize =
-        new ArrayList<>(Arrays.asList(5, 8)); // (5, 8)// start column indexes
-    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
-    List<ColumnStat> testColumnStats = new ArrayList<>();
-    String[] columnDotPath = {"a.b", "a.b"};
-    for (int i = 0; i < columnDotPath.length; i++) {
-      testColumnStats.add(
-          ColumnStat.builder()
-              .field(
-                  InternalField.builder()
-                      .name(primitiveType.getName())
-                      .parentPath(null)
-                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
-                      .build())
-              .numValues(col1NumValTotSize.get(i))
-              .totalSize(col2NumValTotSize.get(i))
-              .range(Range.vector(minStat, maxStat))
-              .build());
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    String fieldName = schema.getFields().get(0).getName();
+    try (ParquetWriter<Group> writer = 
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+      writer.write(addValue(factory.newGroup(), fieldName, minVal));
+      writer.write(addValue(factory.newGroup(), fieldName, maxVal));
     }
-
-    return testColumnStats;
+    return path;
   }
 
-  public static List<ColumnStat> initStringFileTest(File file) throws 
IOException {
-    // create the parquet file by parsing a schema
-    Path path = new Path(file.toURI());
-    Configuration configuration = new Configuration();
-
-    MessageType schema =
-        MessageTypeParser.parseMessageType(
-            "message m { required group a {required fixed_len_byte_array(10) 
b;}}");
-    String[] columnPath = {"a", "b"};
-    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
-    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
-    BinaryStatistics stats = new BinaryStatistics();
-    stats.updateStats(Binary.fromString("1"));
-    stats.updateStats(Binary.fromString("2"));
-    stats.updateStats(Binary.fromString("5"));
-
-    byte[] bytes1 = "First string".getBytes();
-    byte[] bytes2 = "Second string".getBytes();
-
-    // write the string columned file
-
-    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
-    w.start();
-    w.startBlock(3);
-    w.startColumn(c1, 5, codec);
-
-    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
-    w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
-    w.endColumn();
-    w.endBlock();
-    w.startBlock(4);
-    w.startColumn(c1, 8, codec);
-
-    w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
-    w.endColumn();
-    w.endBlock();
-    w.end(new HashMap<String, String>());
-
-    // reconstruct the stats for the InternalDataFile testing object
-    BinaryStatistics stats_clone = new BinaryStatistics();
-    Binary originalBinary1 = Binary.fromString("1");
-    Binary originalBinary2 = Binary.fromString("2");
-    Binary originalBinary5 = Binary.fromString("5");
-    stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
-    stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
-    stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
-
-    Binary minStat = stats_clone.genericGetMin();
-
-    Binary maxStat = stats_clone.genericGetMax();
-    PrimitiveType primitiveType =
-        new PrimitiveType(Repetition.REQUIRED, 
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, "b");
-    List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 8));
-    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(71, 36));
-    List<ColumnStat> testColumnStats = new ArrayList<>();
-    String[] columnDotPath = {"a.b", "a.b"};
-    for (int i = 0; i < columnDotPath.length; i++) {
-      testColumnStats.add(
-          ColumnStat.builder()
-              .field(
-                  InternalField.builder()
-                      .name(primitiveType.getName())
-                      .parentPath(null)
-                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
-                      .build())
-              .numValues(col1NumValTotSize.get(i))
-              .totalSize(col2NumValTotSize.get(i))
-              .range(Range.vector(minStat, maxStat))
-              .build());
+  private static Group addValue(Group group, String fieldName, Object value) {
+    if (value instanceof Integer) {
+      group.add(fieldName, (Integer) value);
+    } else if (value instanceof Long) {
+      group.add(fieldName, (Long) value);
+    } else if (value instanceof Float) {
+      group.add(fieldName, (Float) value);
+    } else if (value instanceof Double) {
+      group.add(fieldName, (Double) value);
+    } else if (value instanceof Boolean) {
+      group.add(fieldName, (Boolean) value);
+    } else if (value instanceof Binary) {
+      group.add(fieldName, (Binary) value);
+    } else {
+      throw new IllegalArgumentException("Unsupported: " + value.getClass());
     }
+    return group;
+  }
 
-    return testColumnStats;
+  /** Read all column stats from a written file. */
+  private List<ColumnStat> readStats(Path path) {
+    ParquetMetadata footer = 
ParquetMetadataExtractor.getInstance().readParquetMetadata(conf, path);
+    return ParquetStatsExtractor.getStatsForFile(
+        footer,
+        ParquetSchemaExtractor.getInstance()
+            .toInternalSchema(footer.getFileMetaData().getSchema(), ""));
   }
 
-  public static List<ColumnStat> initBinaryFileTest(File file) throws 
IOException {
-    // create the parquet file by parsing a schema
-    Path path = new Path(file.toURI());
-    Configuration configuration = new Configuration();
+  private static IntStatistics intStats(int min, int max) {
+    IntStatistics s = new IntStatistics();
+    s.updateStats(min);
+    s.updateStats(max);
+    return s;
+  }
 
-    MessageType schema =
-        MessageTypeParser.parseMessageType("message m { required group a 
{required binary b;}}");
-    String[] columnPath = {"a", "b"};
-    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
-
-    byte[] bytes1 = {0, 1, 2, 3};
-    byte[] bytes2 = {2, 3, 4, 5};
-    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
-    BinaryStatistics stats = new BinaryStatistics();
-    stats.updateStats(Binary.fromString("1"));
-    stats.updateStats(Binary.fromString("2"));
-    stats.updateStats(Binary.fromString("5"));
-
-    // to simplify the test we keep the same stats for both columns
-    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
-    w.start();
-    w.startBlock(3);
-    w.startColumn(c1, 5, codec);
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("primitiveTypeTestCases")
+  void testPrimitiveTypeStats(
+      String name,
+      MessageType schema,
+      Object minWrite,
+      Object maxWrite,
+      InternalType expectedType,
+      Object expectedMin,
+      Object expectedMax)
+      throws IOException {
+    File file = tempDir.resolve("parquet-test-" + name).toFile();
+    Path path = writeParquetFile(file, schema, minWrite, maxWrite);
+    List<ColumnStat> columnStats = readStats(path);
+    assertEquals(1, columnStats.size());
+    ColumnStat stat = columnStats.get(0);
+    assertEquals(expectedType, stat.getField().getSchema().getDataType());
+    assertEquals(expectedMin, stat.getRange().getMinValue());
+    assertEquals(expectedMax, stat.getRange().getMaxValue());
+  }
 
-    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+  static Stream<Arguments> primitiveTypeTestCases() {
+    return Stream.of(
+        // BOOLEAN
+        Arguments.of(
+            "boolean",
+            new MessageType("m", 
Types.required(PrimitiveTypeName.BOOLEAN).named("b")),
+            true, // minWrite
+            false, // maxWrite — Parquet stores {false, true} as {min=false, 
max=true}
+            InternalType.BOOLEAN,
+            false,
+            true),
+
+        // INT32 plain (no logical type) → INT
+        Arguments.of(
+            "int32-plain",
+            new MessageType("m", 
Types.required(PrimitiveTypeName.INT32).named("b")),
+            1,
+            100,
+            InternalType.INT,
+            1,
+            100),
+
+        // INT64 plain (no logical type) → LONG
+        Arguments.of(
+            "int64-plain",
+            new MessageType("message", 
Types.required(PrimitiveTypeName.INT64).named("field1")),
+            100L,
+            500L,
+            InternalType.LONG,
+            100L,
+            500L),
+
+        // INT64 + Timestamp UTC MICROS → TIMESTAMP
+        Arguments.of(
+            "int64-timestamp-utc-micros",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT64)
+                    .as(
+                        LogicalTypeAnnotation.timestampType(
+                            true, LogicalTypeAnnotation.TimeUnit.MICROS))
+                    .named("field1")),
+            1_000_000L,
+            2_000_000L,
+            InternalType.TIMESTAMP,
+            1_000_000L,
+            2_000_000L),
+
+        // INT64 + Timestamp UTC MILLIS → TIMESTAMP
+        Arguments.of(
+            "int64-timestamp-utc-millis",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT64)
+                    .as(
+                        LogicalTypeAnnotation.timestampType(
+                            true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+                    .named("field1")),
+            1000L,
+            2000L,
+            InternalType.TIMESTAMP,
+            1000L,
+            2000L),
+
+        // INT64 + Timestamp UTC NANOS → TIMESTAMP
+        Arguments.of(
+            "int64-timestamp-utc-nanos",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT64)
+                    .as(
+                        LogicalTypeAnnotation.timestampType(
+                            true, LogicalTypeAnnotation.TimeUnit.NANOS))
+                    .named("field1")),
+            1_000_000_000L,
+            2_000_000_000L,
+            InternalType.TIMESTAMP,
+            1_000_000_000L,
+            2_000_000_000L),
+
+        // INT64 + Timestamp NTZ MICROS → TIMESTAMP_NTZ
+        Arguments.of(
+            "int64-timestamp-ntz-micros",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT64)
+                    .as(
+                        LogicalTypeAnnotation.timestampType(
+                            false, LogicalTypeAnnotation.TimeUnit.MICROS))
+                    .named("field1")),
+            1_000_000L,
+            2_000_000L,
+            InternalType.TIMESTAMP_NTZ,
+            1_000_000L,
+            2_000_000L),
+
+        // DOUBLE
+        Arguments.of(
+            "double",
+            new MessageType("message", 
Types.required(PrimitiveTypeName.DOUBLE).named("field1")),
+            1.5,
+            9.9,
+            InternalType.DOUBLE,
+            1.5,
+            9.9),
+
+        // FLOAT
+        Arguments.of(
+            "float",
+            new MessageType("message", 
Types.required(PrimitiveTypeName.FLOAT).named("field1")),
+            1.0f,
+            3.14f,
+            InternalType.FLOAT,
+            1.0f,
+            3.14f),
+
+        // INT32 + Date → DATE
+        Arguments.of(
+            "int32-date",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT32)
+                    .as(LogicalTypeAnnotation.dateType())
+                    .named("field1")),
+            18000,
+            19000,
+            InternalType.DATE,
+            18000,
+            19000),
+
+        // INT32 + Time MILLIS → INT
+        Arguments.of(
+            "int32-time-millis",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT32)
+                    .as(LogicalTypeAnnotation.timeType(true, 
LogicalTypeAnnotation.TimeUnit.MILLIS))
+                    .named("field1")),
+            0,
+            86_400_000,
+            InternalType.INT,
+            0,
+            86_400_000),
+
+        // BINARY plain (no logical type) → BYTES; range stays Binary
+        Arguments.of(
+            "binary-plain",
+            new MessageType("message", 
Types.required(PrimitiveTypeName.BINARY).named("field1")),
+            Binary.fromString("aaa"),
+            Binary.fromString("zzz"),
+            InternalType.BYTES,
+            Binary.fromString("aaa"),
+            Binary.fromString("zzz")),
+
+        // BINARY + STRING → STRING; range values are Java String
+        Arguments.of(
+            "binary-string",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.BINARY)
+                    .as(LogicalTypeAnnotation.stringType())
+                    .named("field1")),
+            Binary.fromString("apple"),
+            Binary.fromString("zebra"),
+            InternalType.STRING,
+            "apple",
+            "zebra"),
+
+        // BINARY + JSON → BYTES; range stays Binary
+        Arguments.of(
+            "binary-json",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.BINARY)
+                    .as(LogicalTypeAnnotation.jsonType())
+                    .named("field1")),
+            Binary.fromString("{\"a\":1}"),
+            Binary.fromString("{\"z\":99}"),
+            InternalType.BYTES,
+            Binary.fromString("{\"a\":1}"),
+            Binary.fromString("{\"z\":99}")),
+
+        // FIXED_LEN_BYTE_ARRAY(16) + UUID → UUID; range stays Binary
+        Arguments.of(
+            "fixed-uuid",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+                    .length(16)
+                    .as(LogicalTypeAnnotation.uuidType())
+                    .named("field1")),
+            Binary.fromConstantByteArray(new byte[16]),
+            Binary.fromConstantByteArray(
+                new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}),
+            InternalType.UUID,
+            Binary.fromConstantByteArray(new byte[16]),
+            Binary.fromConstantByteArray(
+                new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1})),
+
+        // FIXED_LEN_BYTE_ARRAY(8) → BYTES; range stays Binary
+        Arguments.of(
+            "fixed-bytes",
+            new MessageType(
+                "message",
+                
Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(8).named("field1")),
+            Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 0, 0, 1}),
+            Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 1, 0, 0}),
+            InternalType.FIXED,
+            Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 0, 0, 1}),
+            Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 1, 0, 
0})));
+  }
 
-    w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("decimalTypeTestCases")
+  void testDecimalTypeStats(
+      String name,
+      MessageType schema,
+      Object minWrite,
+      Object maxWrite,
+      int expectedPrecision,
+      int expectedScale,
+      Object expectedMin,
+      Object expectedMax)
+      throws IOException {
+    File file = tempDir.resolve("parquet-test-" + name).toFile();
+    Path path = writeParquetFile(file, schema, minWrite, maxWrite);
+    List<ColumnStat> columnStats = readStats(path);
+    assertEquals(1, columnStats.size());
+    ColumnStat stat = columnStats.get(0);
+    assertEquals(InternalType.DECIMAL, 
stat.getField().getSchema().getDataType(), name);
+    Map<InternalSchema.MetadataKey, Object> metadata = 
stat.getField().getSchema().getMetadata();
+    assertEquals(
+        expectedPrecision,
+        metadata.get(InternalSchema.MetadataKey.DECIMAL_PRECISION),
+        name + " precision");
+    assertEquals(expectedScale, 
metadata.get(InternalSchema.MetadataKey.DECIMAL_SCALE));
+    assertEquals(expectedMin, stat.getRange().getMinValue());
+    assertEquals(expectedMax, stat.getRange().getMaxValue());
+  }
 
-    w.endColumn();
-    w.endBlock();
-    w.startBlock(4);
+  static Stream<Arguments> decimalTypeTestCases() {
+    // Note: LogicalTypeAnnotation.decimalType(scale, precision) — scale is 
first argument
+    return Stream.of(
+        // INT32 + Decimal(precision=9, scale=2) — INT32 supports precision up 
to 9
+        Arguments.of(
+            "int32-decimal",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT32)
+                    .as(LogicalTypeAnnotation.decimalType(2, 9))
+                    .named("decimal_field")),
+            100,
+            9999,
+            9,
+            2,
+            new BigDecimal("1.00"),
+            new BigDecimal("99.99")),
+
+        // INT64 + Decimal(precision=18, scale=6)
+        Arguments.of(
+            "int64-decimal",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.INT64)
+                    .as(LogicalTypeAnnotation.decimalType(6, 18))
+                    .named("decimal_field")),
+            1_000_000L,
+            9_999_999L,
+            18,
+            6,
+            new BigDecimal("1.000000"),
+            new BigDecimal("9.999999")),
+
+        // FIXED_LEN_BYTE_ARRAY(8) + Decimal(precision=18, scale=6); range is 
BigDecimal
+        Arguments.of(
+            "fixed-decimal",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+                    .length(8)
+                    .as(LogicalTypeAnnotation.decimalType(6, 18))
+                    .named("decimal_field")),
+            Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 0, 0, 1}),
+            Binary.fromConstantByteArray(new byte[] {0, 0, 0, 0, 0, 1, 0, 0}),
+            18,
+            6,
+            new BigDecimal("0.000001"),
+            new BigDecimal("0.065536")),
+
+        // BINARY + Decimal; range is BigDecimal
+        Arguments.of(
+            "binary-decimal",
+            new MessageType(
+                "message",
+                Types.required(PrimitiveTypeName.BINARY)
+                    .as(LogicalTypeAnnotation.decimalType(4, 20))
+                    .named("decimal_field")),
+            Binary.fromConstantByteArray(new byte[] {0, 0, 0, 100}),
+            Binary.fromConstantByteArray(new byte[] {0, 0, 3, (byte) 0xe8}),
+            20,
+            4,
+            new BigDecimal("0.0100"),
+            new BigDecimal("0.1000")));
+  }
 
-    w.startColumn(c1, 1, codec);
-    w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+  @Test
+  void testMultipleColumnsStat() throws IOException {
+    MessageType schema =
+        new MessageType(
+            "message",
+            Types.required(PrimitiveTypeName.INT32).named("col_int"),
+            Types.required(PrimitiveTypeName.DOUBLE).named("col_double"));
 
-    w.endColumn();
-    w.endBlock();
-    w.end(new HashMap<String, String>());
-
-    // reconstruct the stats for the InternalDataFile testing object
-    BinaryStatistics stats_clone = new BinaryStatistics();
-    Binary originalBinary1 = Binary.fromString("1");
-    Binary originalBinary2 = Binary.fromString("2");
-    Binary originalBinary5 = Binary.fromString("5");
-    stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
-    stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
-    stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
-
-    Binary minStat = stats_clone.genericGetMin();
-    Binary maxStat = stats_clone.genericGetMax();
-    PrimitiveType primitiveType =
-        new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "b");
-    List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 1));
-    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
-    List<ColumnStat> testColumnStats = new ArrayList<>();
-    String[] columnDotPath = {"a.b", "a.b"};
-    for (int i = 0; i < columnDotPath.length; i++) {
-      testColumnStats.add(
-          ColumnStat.builder()
-              .field(
-                  InternalField.builder()
-                      .name(primitiveType.getName())
-                      .parentPath(null)
-                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
-                      .build())
-              .numValues(col1NumValTotSize.get(i))
-              .totalSize(col2NumValTotSize.get(i))
-              .range(Range.vector(minStat, maxStat))
-              .build());
+    File file = tempDir.resolve("parquet-test-multi-col").toFile();
+    Path path = new Path(file.toURI());
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    try (ParquetWriter<Group> writer = 
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+      Group row1 = factory.newGroup();
+      row1.add("col_int", 5);
+      row1.add("col_double", 1.5);
+      Group row2 = factory.newGroup();
+      row2.add("col_int", 50);
+      row2.add("col_double", 9.9);
+      writer.write(row1);
+      writer.write(row2);
     }
 
-    return testColumnStats;
+    List<ColumnStat> columnStats = readStats(path);
+    assertEquals(2, columnStats.size());
+
+    ColumnStat intStat =
+        columnStats.stream()
+            .filter(s -> s.getField().getName().equals("col_int"))
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("col_int not found"));
+    ColumnStat doubleStat =
+        columnStats.stream()
+            .filter(s -> s.getField().getName().equals("col_double"))
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("col_double not found"));
+
+    assertEquals(InternalType.INT, 
intStat.getField().getSchema().getDataType());
+    assertEquals(5, intStat.getRange().getMinValue());
+    assertEquals(50, intStat.getRange().getMaxValue());
+
+    assertEquals(InternalType.DOUBLE, 
doubleStat.getField().getSchema().getDataType());
+    assertEquals(1.5, doubleStat.getRange().getMinValue());
+    assertEquals(9.9, doubleStat.getRange().getMaxValue());
   }
 
-  public static List<ColumnStat> initIntFileTest(File file) throws IOException 
{
-    // create the parquet file by parsing a schema
-    Path path = new Path(file.toURI());
-    Configuration configuration = new Configuration();
-
+  @Test
+  void testMultipleRowGroupsStat() throws IOException {
     MessageType schema =
-        MessageTypeParser.parseMessageType("message m { required group a 
{required int32 b;}}");
-    String[] columnPath = {"a", "b"};
-    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
-
-    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
-
-    IntStatistics stats = new IntStatistics();
-    stats.updateStats(1);
-    stats.updateStats(2);
-    stats.updateStats(5);
+        new MessageType("message", 
Types.required(PrimitiveTypeName.INT32).named("field1"));
+    ColumnDescriptor col = schema.getColumns().get(0);
+    IntStatistics stats1 = intStats(10, 100);
+    IntStatistics stats2 = intStats(200, 500);
 
-    // to simplify the test we keep the same stats for both columns
-    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    File file = tempDir.resolve("parquet-test-multi-rg").toFile();
+    Path path = new Path(file.toURI());
+    ParquetFileWriter w = new ParquetFileWriter(conf, schema, path);
     w.start();
-    w.startBlock(3);
-
-    w.startColumn(c1, 2, codec);
-
-    w.writeDataPage(3, 3, BytesInput.fromInt(3), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
-
-    w.writeDataPage(3, 3, BytesInput.fromInt(2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.startBlock(10);
+    w.startColumn(col, 10, CompressionCodecName.UNCOMPRESSED);
+    w.writeDataPage(10, 4, BytesInput.fromInt(0), stats1, BIT_PACKED, 
BIT_PACKED, PLAIN);
     w.endColumn();
     w.endBlock();
-    w.startBlock(4);
-
-    w.startColumn(c1, 1, codec);
-
-    w.writeDataPage(3, 3, BytesInput.fromInt(1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.startBlock(5);
+    w.startColumn(col, 5, CompressionCodecName.UNCOMPRESSED);
+    w.writeDataPage(5, 4, BytesInput.fromInt(0), stats2, BIT_PACKED, 
BIT_PACKED, PLAIN);
     w.endColumn();
     w.endBlock();
-    w.end(new HashMap<String, String>());
-
-    // reconstruct the stats for the InternalDataFile testing object
-
-    java.lang.Integer minStat = stats.genericGetMin();
-    java.lang.Integer maxStat = stats.genericGetMax();
-    PrimitiveType primitiveType =
-        new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.INT32, "b");
-    List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(2, 1));
-    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
-    List<ColumnStat> testColumnStats = new ArrayList<>();
-    String[] columnDotPath = {"a.b", "a.b"};
-    for (int i = 0; i < columnDotPath.length; i++) {
-      testColumnStats.add(
-          ColumnStat.builder()
-              .field(
-                  InternalField.builder()
-                      .name(primitiveType.getName())
-                      .parentPath(null)
-                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
-                      .build())
-              .numValues(col1NumValTotSize.get(i))
-              .totalSize(col2NumValTotSize.get(i))
-              .range(Range.vector(minStat, maxStat))
-              .build());
-    }
-
-    return testColumnStats;
+    w.end(new HashMap<>());
+
+    List<ColumnStat> columnStats = readStats(path);
+    assertEquals(1, columnStats.size());
+    ColumnStat columnStat = columnStats.get(0);
+    assertEquals(10, columnStat.getRange().getMinValue());
+    assertEquals(500, columnStat.getRange().getMaxValue());
+    assertEquals(15L, columnStat.getNumValues());
   }
 
   @Test
-  public void testInternalDataFileStringStat() throws IOException {
-
-    Configuration configuration = new Configuration();
-
-    java.nio.file.Path path = tempDir.resolve("parquet-test-string-file");
-    File file = path.toFile();
-    file.deleteOnExit();
-    List<ColumnStat> testColumnStats = initStringFileTest(file);
-    Path hadoopPath = new Path(file.toURI());
-    // statsExtractor toInternalDataFile testing
-    InternalDataFile internalDataFile =
-        statsExtractor.toInternalDataFile(configuration, hadoopPath);
-    InternalDataFile testInternalFile =
-        InternalDataFile.builder()
-            .physicalPath(
-                "file:"
-                    .concat(
-                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
-            .columnStats(testColumnStats)
-            .fileFormat(FileFormat.APACHE_PARQUET)
-            .lastModified(file.lastModified())
-            .fileSizeBytes(file.length())
-            .recordCount(8)
-            .build();
-
-    Assertions.assertEquals(testInternalFile, internalDataFile);
-  }
+  void testAllNullColumnStats() throws IOException {
+    MessageType schema =
+        new MessageType("message", 
Types.optional(PrimitiveTypeName.INT32).named("col"));
+    File file = tempDir.resolve("parquet-all-null").toFile();
+    Path path = new Path(file.toURI());
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    try (ParquetWriter<Group> writer = 
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+      writer.write(factory.newGroup());
+      writer.write(factory.newGroup());
+      writer.write(factory.newGroup());
+    }
 
-  @Test
-  public void testInternalDataFileBinaryStat() throws IOException {
-
-    Configuration configuration = new Configuration();
-
-    java.nio.file.Path path = tempDir.resolve("parquet-test-binary-file");
-    File file = path.toFile();
-    file.deleteOnExit();
-    List<ColumnStat> testColumnStats = initBinaryFileTest(file);
-    Path hadoopPath = new Path(file.toURI());
-    // statsExtractor toInternalDataFile testing
-    InternalDataFile internalDataFile =
-        statsExtractor.toInternalDataFile(configuration, hadoopPath);
-    InternalDataFile testInternalFile =
-        InternalDataFile.builder()
-            .physicalPath(
-                "file:"
-                    .concat(
-                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
-            .columnStats(testColumnStats)
-            .fileFormat(FileFormat.APACHE_PARQUET)
-            .lastModified(file.lastModified())
-            .fileSizeBytes(file.length())
-            .recordCount(5)
-            .build();
-
-    Assertions.assertEquals(testInternalFile, internalDataFile);
+    List<ColumnStat> columnStats = readStats(path);
+    assertEquals(1, columnStats.size());
+    ColumnStat stat = columnStats.get(0);
+    assertEquals(3L, stat.getNumNulls());
+    assertNull(stat.getRange().getMinValue());
+    assertNull(stat.getRange().getMaxValue());
   }
 
   @Test
-  public void testInternalDataFileIntStat() throws IOException {
-
-    Configuration configuration = new Configuration();
-    java.nio.file.Path path = tempDir.resolve("parquet-test-int-file");
-    File file = path.toFile();
-    file.deleteOnExit();
-    List<ColumnStat> testColumnStats = initIntFileTest(file);
-    Path hadoopPath = new Path(file.toURI());
-    // statsExtractor toInternalDataFile testing
-    InternalDataFile internalDataFile =
-        statsExtractor.toInternalDataFile(configuration, hadoopPath);
-    InternalDataFile testInternalFile =
-        InternalDataFile.builder()
-            .physicalPath(
-                "file:"
-                    .concat(
-                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
-            .columnStats(testColumnStats)
-            .fileFormat(FileFormat.APACHE_PARQUET)
-            .lastModified(file.lastModified())
-            .fileSizeBytes(file.length())
-            .recordCount(2)
-            .build();
-
-    Assertions.assertEquals(testInternalFile, internalDataFile);
-  }
+  void testNumNullsIsSet() throws IOException {
+    MessageType schema =
+        new MessageType("message", 
Types.optional(PrimitiveTypeName.INT32).named("col"));
+    File file = tempDir.resolve("parquet-partial-null").toFile();
+    Path path = new Path(file.toURI());
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    try (ParquetWriter<Group> writer = 
ExampleParquetWriter.builder(path).withConf(conf).build()) {
+      Group row1 = factory.newGroup();
+      row1.add("col", 10);
+      writer.write(row1);
+      writer.write(factory.newGroup()); // null row
+      Group row3 = factory.newGroup();
+      row3.add("col", 50);
+      writer.write(row3);
+    }
 
-  @Test
-  public void testInternalDataFileBooleanStat() throws IOException {
-    Configuration configuration = new Configuration();
-
-    java.nio.file.Path path = tempDir.resolve("parquet-test-boolean-file");
-    File file = path.toFile();
-    file.deleteOnExit();
-
-    List<ColumnStat> testColumnStats = initBooleanFileTest(file);
-    Path hadoopPath = new Path(file.toURI());
-    // statsExtractor toInternalDataFile testing
-    InternalDataFile internalDataFile =
-        statsExtractor.toInternalDataFile(configuration, hadoopPath);
-    InternalDataFile testInternalFile =
-        InternalDataFile.builder()
-            .physicalPath(
-                "file:"
-                    .concat(
-                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
-            .columnStats(testColumnStats)
-            .fileFormat(FileFormat.APACHE_PARQUET)
-            .lastModified(file.lastModified())
-            .fileSizeBytes(file.length())
-            .recordCount(8)
-            .build();
-
-    Assertions.assertEquals(testInternalFile, internalDataFile);
+    List<ColumnStat> columnStats = readStats(path);
+    assertEquals(1, columnStats.size());
+    ColumnStat stat = columnStats.get(0);
+    assertEquals(1L, stat.getNumNulls());
+    assertEquals(10, stat.getRange().getMinValue());
+    assertEquals(50, stat.getRange().getMaxValue());
   }
 }

Reply via email to