This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 108ec647 [553] Parquet Schema and Column Stats Converters (#669)
108ec647 is described below

commit 108ec6473c6419a6b93ad0ffe2d39e46d9290f07
Author: Selim Soufargi <80632333+unical1...@users.noreply.github.com>
AuthorDate: Sat May 17 21:00:13 2025 +0200

    [553] Parquet Schema and Column Stats Converters (#669)
    
    * smaller PR for parquet
    
    * read parquet file for metadataExtractor: compiling, not testd
    
    * cleanups for statsExtractor: compiling, not testd
    
    * refactoring for statsExtractor: compiling, not testd
    
    * added avro dependency
    
    * added tests for SchemaExtractor: int and string primitiveTypes test passes
    
    * fixed some minor bugs in SchemaExtractor
    
    * close fileReader and handle exception
    
    * adjusted fromInternalSchema()
    
    * added a test and adjusted SchemaExtractor
    
    * added a testing code
    
    * bug fix for Schema extractor: groupType
    
    * bug fix for Schema extractor
    
    * bug fix for tests
    
    * bug fix for SchemaExtractor and added tests for nested lists support
    
    * bug fix for tests for nested lists support
    
    * bug fix for complex test which now passes!
    
    * added test for Map
    
    * schemaExtractor refactored
    
    * bug fixed isNullable() schema
    
    * fromInternalSchema : list and map types
    
    * decimal primitive test added
    
    * float primitive + list and map tests for fromInternalSchema
    
    * added tests for primitive type (date and timestamp)
    
    * refactoring for partitionValues extractor
    
    * git build error fixed
    
    * cleanups for schemaExtractor + refactoring for schemaExtractorTests + 
added test code for statsExtractor
    
    * added assertsEqual test for stats + removed partitionFields from the 
test,  TODO check if field is needed in ColumnStats
    
    * bug fixed for stats tests: columnStats + tests data are read using 
FileReader
    
    * bug fixed for stats tests, TODO equality test for two objects
    
    * added compareFiles() in InternDataFile for the statsExtractor tests to 
pass: OK
    
    * added custom comparison test for ColumnStat and InternDataFile, test 
passes, TODO: other stat types and other schema types testing
    
    * added custom comparison test for ColumnStat (field) and exec spotless 
apply
    
    * tempDir for parquet stats testing
    
    * binaryStatistics test passes
    
    * added int32 file schema test for statsExtractor
    
    * cleanups + added fields comparison for InternalDataFile
    
    * cleanups + added fixed_len_byte_array primitive type schema file test
    
    * use of genericGetMax instead for stats extraction + cleanups
    
    * boolean schema file test for statsExtractor added
    
    * removed hard coded path in statsExtractor test
    
    * cleanups + imports
    
    * separate tests for int and binary for stats
    
    * custom equals() not needed for InternalDataFile and ColumnStat
    
    * removed parquet version from core sub-project pom
    
    * statsExtractor tests as a suite, removed comments + run spotless apply
    
    * removed uncessary classes
    
    * removed uncessary classes: undo
    
    * undo irrelevant changes
    
    * fixed formatting issues with spotless:apply cmd
    
    * cleanups for test class and fixes for failed build
    
    * tmp file name fixed for failed build
    
    * cleanups
    
    * splotless apply run + assertion internalDataFile equality changed to 
display errors
    
    * fixes for build, PhysicalPath and BinaryStats
    
    * fixes for build, PhysicalPath and BinaryStats + synced fork
    
    * fixes for build, PhysicalPath and BinaryStats + synced fork
    
    * fixes for build and cleanups
    
    * fixes for build and cleanups
    
    * Parquet dep set as provided to use Spark's
    
    * parquet dep version back to 1.15.1
    
    * parquet-avro moved from core to project's pom
    
    * parquet-avro moved after hadoop-common
    
    * parquet dep scope removed
    
    * run spotless:apply
    
    ---------
    
    Co-authored-by: Selim Soufargi <ssoufargi.idealab.uni...@gmail.com~>
---
 .../apache/xtable/conversion/ExternalTable.java    |   4 +
 .../ThreePartHierarchicalTableIdentifier.java      |   1 +
 .../apache/xtable/model/schema/InternalField.java  |   2 +
 .../apache/xtable/model/schema/InternalSchema.java |   3 +-
 xtable-core/pom.xml                                |   6 +-
 .../xtable/parquet/ParquetMetadataExtractor.java   |  63 +++
 .../xtable/parquet/ParquetSchemaExtractor.java     | 494 +++++++++++++++++++++
 .../xtable/parquet/ParquetStatsExtractor.java      | 154 +++++++
 .../org/apache/xtable/TestAbstractHudiTable.java   |   1 +
 .../java/org/apache/xtable/TestJavaHudiTable.java  |   1 +
 .../org/apache/xtable/iceberg/StubCatalog.java     |   1 +
 .../xtable/parquet/TestParquetSchemaExtractor.java | 344 ++++++++++++++
 .../xtable/parquet/TestParquetStatsExtractor.java  | 454 +++++++++++++++++++
 .../apache/xtable/utilities/RunCatalogSync.java    |   7 +
 14 files changed, 1533 insertions(+), 2 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
index 939c59c0..a6c97d8f 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -34,12 +34,16 @@ import com.google.common.base.Preconditions;
 class ExternalTable {
   /** The name of the table. */
   protected final @NonNull String name;
+
   /** The format of the table (e.g. DELTA, ICEBERG, HUDI) */
   protected final @NonNull String formatName;
+
   /** The path to the root of the table or the metadata directory depending on 
the format */
   protected final @NonNull String basePath;
+
   /** Optional namespace for the table */
   protected final String[] namespace;
+
   /** The configuration for interacting with the catalog that manages this 
table */
   protected final CatalogConfig catalogConfig;
 
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
index 2608d36a..f387c7d3 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
@@ -42,6 +42,7 @@ public class ThreePartHierarchicalTableIdentifier implements 
HierarchicalTableId
    * name varies depending on the catalogType.
    */
   String catalogName;
+
   /**
    * Catalogs have the ability to group tables logically, databaseName is the 
identifier for such
    * logical classification. The alternate names for this field include 
namespace, schemaName etc.
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
index 16b6da8a..31eb0ed4 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
@@ -43,9 +43,11 @@ public class InternalField {
   // The id field for the field. This is used to identify the field in the 
schema even after
   // renames.
   Integer fieldId;
+
   // represents the fully qualified path to the field (dot separated)
   @Getter(lazy = true)
   String path = createPath();
+
   // splits the dot separated path into parts
   @Getter(lazy = true)
   String[] pathParts = splitPath();
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
index 20af37e0..8b7e0fc5 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
@@ -75,7 +75,8 @@ public class InternalSchema {
 
   public enum MetadataValue {
     MICROS,
-    MILLIS
+    MILLIS,
+    NANOS
   }
 
   public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType";
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 24bc31df..6bd5282c 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -56,7 +56,7 @@
             <artifactId>guava</artifactId>
         </dependency>
 
-        <!-- Avro -->
+
         <dependency>
             <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
@@ -116,6 +116,10 @@
             <artifactId>hadoop-common</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+        </dependency>
 
         <!-- Logging API -->
         <dependency>
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
new file mode 100644
index 00000000..f29a186d
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.exception.ReadException;
+
+public class ParquetMetadataExtractor {
+
+  private static final ParquetMetadataExtractor INSTANCE = new 
ParquetMetadataExtractor();
+
+  public static ParquetMetadataExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public static MessageType getSchema(ParquetMetadata footer) {
+    MessageType schema = footer.getFileMetaData().getSchema();
+    return schema;
+  }
+
+  public static ParquetMetadata readParquetMetadata(Configuration conf, Path 
filePath) {
+    InputFile file = null;
+    try {
+      file = HadoopInputFile.fromPath(filePath, conf);
+    } catch (IOException e) {
+      throw new ReadException("Failed to read the parquet file", e);
+    }
+
+    ParquetReadOptions options = HadoopReadOptions.builder(conf, 
filePath).build();
+    try (ParquetFileReader fileReader = ParquetFileReader.open(file, options)) 
{
+      return fileReader.getFooter();
+    } catch (Exception e) {
+      throw new ReadException("Failed to read the parquet file", e);
+    }
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
new file mode 100644
index 00000000..9043ac0d
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.exception.SchemaExtractorException;
+import org.apache.xtable.exception.UnsupportedSchemaTypeException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SchemaUtils;
+
+/**
+ * Class that converts parquet Schema {@link Schema} to Canonical Schema 
{@link InternalSchema} and
+ * vice-versa. This conversion is fully reversible and there is a strict 1 to 
1 mapping between
+ * parquet data types and canonical data types.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetSchemaExtractor {
+  // parquet only supports string keys in maps
+  private static final InternalField MAP_KEY_FIELD =
+      InternalField.builder()
+          .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+          .schema(
+              InternalSchema.builder()
+                  .name("map_key")
+                  .dataType(InternalType.STRING)
+                  .isNullable(false)
+                  .build())
+          .defaultValue("")
+          .build();
+  private static final ParquetSchemaExtractor INSTANCE = new 
ParquetSchemaExtractor();
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+
+  public static ParquetSchemaExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  private static boolean isNullable(Type schema) {
+    return schema.getRepetition() != Repetition.REQUIRED;
+  }
+
+  /**
+   * Converts the parquet {@link Schema} to {@link InternalSchema}.
+   *
+   * @param schema The schema being converted
+   * @param parentPath If this schema is nested within another, this will be a 
dot separated string
+   *     representing the path from the top most field to the current schema.
+   * @return a converted schema
+   */
+  public InternalSchema toInternalSchema(Type schema, String parentPath) {
+    InternalType newDataType = null;
+    Type.Repetition currentRepetition = null;
+    List<InternalField> subFields = null;
+    PrimitiveType primitiveType;
+    LogicalTypeAnnotation logicalType;
+    Map<InternalSchema.MetadataKey, Object> metadata =
+        new EnumMap<>(InternalSchema.MetadataKey.class);
+    String elementName = schema.getName();
+    if (schema.isPrimitive()) {
+      primitiveType = schema.asPrimitiveType();
+      switch (primitiveType.getPrimitiveTypeName()) {
+          // PrimitiveTypes
+        case INT64:
+          logicalType = schema.getLogicalTypeAnnotation();
+          if (logicalType instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+            LogicalTypeAnnotation.TimeUnit timeUnit =
+                ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) 
logicalType).getUnit();
+            boolean isAdjustedToUTC =
+                ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) 
logicalType)
+                    .isAdjustedToUTC();
+            if (isAdjustedToUTC) {
+              newDataType = InternalType.TIMESTAMP;
+            } else {
+              newDataType = InternalType.TIMESTAMP_NTZ;
+            }
+            if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) {
+              metadata.put(
+                  InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                  InternalSchema.MetadataValue.MICROS);
+            } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
+              metadata.put(
+                  InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                  InternalSchema.MetadataValue.MILLIS);
+            } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+              metadata.put(
+                  InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                  InternalSchema.MetadataValue.NANOS);
+            }
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+            newDataType = InternalType.INT;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+            LogicalTypeAnnotation.TimeUnit timeUnit =
+                ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) 
logicalType).getUnit();
+            if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS
+                || timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+              newDataType = InternalType.INT;
+            }
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_PRECISION,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getPrecision());
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_SCALE,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getScale());
+            newDataType = InternalType.DECIMAL;
+
+          } else {
+            newDataType = InternalType.LONG;
+          }
+          break;
+        case INT32:
+          logicalType = schema.getLogicalTypeAnnotation();
+          if (logicalType instanceof 
LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
+            newDataType = InternalType.DATE;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+            LogicalTypeAnnotation.TimeUnit timeUnit =
+                ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) 
logicalType).getUnit();
+            if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
+              newDataType = InternalType.INT;
+            }
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_PRECISION,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getPrecision());
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_SCALE,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getScale());
+            newDataType = InternalType.DECIMAL;
+
+          } else {
+            newDataType = InternalType.INT;
+          }
+          break;
+        case FLOAT:
+          newDataType = InternalType.FLOAT;
+          break;
+        case FIXED_LEN_BYTE_ARRAY:
+          logicalType = schema.getLogicalTypeAnnotation();
+          if (logicalType instanceof 
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
+            newDataType = InternalType.UUID;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) {
+            metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 12);
+            newDataType = InternalType.FIXED;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_PRECISION,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getPrecision());
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_SCALE,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getScale());
+            newDataType = InternalType.DECIMAL;
+          }
+          break;
+        case BINARY:
+          // TODO Variant,GEOMETRY, GEOGRAPHY,
+          logicalType = schema.getLogicalTypeAnnotation();
+          if (logicalType instanceof 
LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
+            metadata.put(
+                InternalSchema.MetadataKey.ENUM_VALUES, 
logicalType.toOriginalType().values());
+            newDataType = InternalType.ENUM;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.JsonLogicalTypeAnnotation) {
+            newDataType = InternalType.BYTES;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.BsonLogicalTypeAnnotation) {
+            newDataType = InternalType.BYTES;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+            newDataType = InternalType.STRING;
+          } else if (logicalType instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_PRECISION,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getPrecision());
+            metadata.put(
+                InternalSchema.MetadataKey.DECIMAL_SCALE,
+                ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getScale());
+            newDataType = InternalType.DECIMAL;
+
+          } else {
+            newDataType = InternalType.BYTES;
+          }
+          break;
+        case BOOLEAN:
+          newDataType = InternalType.BOOLEAN;
+          break;
+        default:
+          throw new UnsupportedSchemaTypeException(
+              String.format("Unsupported schema type %s", schema));
+      }
+    } else {
+      // GroupTypes
+      logicalType = schema.getLogicalTypeAnnotation();
+      if (logicalType instanceof 
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+        String schemaName = schema.asGroupType().getName();
+        Type.ID schemaId = schema.getId();
+        InternalSchema elementSchema =
+            toInternalSchema(
+                schema.asGroupType().getType(0),
+                SchemaUtils.getFullyQualifiedPath(
+                    parentPath, 
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME));
+        InternalField elementField =
+            InternalField.builder()
+                .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+                .parentPath(parentPath)
+                .schema(elementSchema)
+                .fieldId(schemaId == null ? null : schemaId.intValue())
+                .build();
+        return InternalSchema.builder()
+            .name(schema.getName())
+            .dataType(InternalType.LIST)
+            .comment(null)
+            .isNullable(isNullable(schema.asGroupType()))
+            .fields(Collections.singletonList(elementField))
+            .build();
+      } else if (logicalType instanceof 
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
+        String schemaName = schema.asGroupType().getName();
+        Type.ID schemaId = schema.getId();
+        InternalSchema valueSchema =
+            toInternalSchema(
+                schema.asGroupType().getType(0),
+                SchemaUtils.getFullyQualifiedPath(
+                    parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME));
+        InternalField valueField =
+            InternalField.builder()
+                .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+                .parentPath(parentPath)
+                .schema(valueSchema)
+                .fieldId(schemaId == null ? null : schemaId.intValue())
+                .build();
+        return InternalSchema.builder()
+            .name(schemaName)
+            .dataType(InternalType.MAP)
+            .comment(null)
+            .isNullable(isNullable(schema.asGroupType()))
+            .fields(valueSchema.getFields())
+            .build();
+      } else {
+
+        subFields = new ArrayList<>(schema.asGroupType().getFields().size());
+        for (Type parquetField : schema.asGroupType().getFields()) {
+          String fieldName = parquetField.getName();
+          Type.ID fieldId = parquetField.getId();
+          currentRepetition = parquetField.getRepetition();
+          InternalSchema subFieldSchema =
+              toInternalSchema(
+                  parquetField, SchemaUtils.getFullyQualifiedPath(parentPath, 
fieldName));
+
+          if (schema.asGroupType().getFields().size()
+              == 1) { // TODO Tuple (many subelements in a list)
+            newDataType = subFieldSchema.getDataType();
+            elementName = subFieldSchema.getName();
+            break;
+          }
+          subFields.add(
+              InternalField.builder()
+                  .parentPath(parentPath)
+                  .name(fieldName)
+                  .schema(subFieldSchema)
+                  .defaultValue(null)
+                  .fieldId(fieldId == null ? null : fieldId.intValue())
+                  .build());
+        }
+        if (currentRepetition != Repetition.REPEATED
+            && schema.asGroupType().getName() != "list"
+            && !Arrays.asList("key_value", 
"map").contains(schema.asGroupType().getName())) {
+          return InternalSchema.builder()
+              .name(schema.getName())
+              .comment(null)
+              .dataType(InternalType.RECORD)
+              .fields(subFields)
+              .isNullable(isNullable(schema.asGroupType()))
+              .build();
+        }
+      }
+    }
+    return InternalSchema.builder()
+        .name(elementName)
+        .dataType(newDataType)
+        .fields(subFields == null || subFields.size() == 0 ? null : subFields)
+        .comment(null)
+        .isNullable(isNullable(schema))
+        .metadata(metadata.isEmpty() ? null : metadata)
+        .build();
+  }
+
+  /**
+   * Internal method for converting the {@link InternalSchema} to parquet 
{@link Schema}.
+   *
+   * @param internalSchema internal schema representation
+   * @param currentPath If this schema is nested within another, this will be 
a dot separated
+   *     string. This is used for the parquet namespace to guarantee unique 
names for nested
+   *     records.
+   * @return an parquet schema
+   */
+  public Type fromInternalSchema(InternalSchema internalSchema, String 
currentPath) {
+    Type type = null;
+    Type listType = null;
+    Type mapType = null;
+    Type mapKeyType = null;
+    Type mapValueType = null;
+    String fieldName = internalSchema.getName();
+    InternalType internalType = internalSchema.getDataType();
+    switch (internalType) {
+      case BOOLEAN:
+        type =
+            Types.required(PrimitiveTypeName.BOOLEAN)
+                .as(LogicalTypeAnnotation.intType(8, false))
+                .named(fieldName);
+        break;
+      case INT:
+        type =
+            Types.required(PrimitiveTypeName.INT32)
+                .as(LogicalTypeAnnotation.intType(32, false))
+                .named(fieldName);
+        break;
+      case LONG:
+        type =
+            Types.required(PrimitiveTypeName.INT64)
+                .as(LogicalTypeAnnotation.intType(64, false))
+                .named(fieldName);
+        break;
+      case STRING:
+        type =
+            Types.required(PrimitiveTypeName.BINARY)
+                .as(LogicalTypeAnnotation.stringType())
+                .named(fieldName);
+        break;
+      case FLOAT:
+        type = Types.required(PrimitiveTypeName.FLOAT).named(fieldName);
+        break;
+      case DECIMAL:
+        int precision =
+            (int) 
internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
+        int scale =
+            (int) 
internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
+        type =
+            Types.required(PrimitiveTypeName.FLOAT)
+                .as(LogicalTypeAnnotation.decimalType(scale, precision))
+                .named(fieldName);
+        break;
+
+      case ENUM:
+        type =
+            new org.apache.parquet.avro.AvroSchemaConverter()
+                .convert(
+                    Schema.createEnum(
+                        fieldName,
+                        internalSchema.getComment(),
+                        null,
+                        (List<String>)
+                            internalSchema
+                                .getMetadata()
+                                .get(InternalSchema.MetadataKey.ENUM_VALUES),
+                        null))
+                .getType(fieldName);
+        break;
+      case DATE:
+        type =
+            Types.required(PrimitiveTypeName.INT32)
+                .as(LogicalTypeAnnotation.dateType())
+                .named(fieldName);
+        break;
+      case TIMESTAMP:
+        if 
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+            == InternalSchema.MetadataValue.MICROS) {
+          type =
+              Types.required(PrimitiveTypeName.INT64)
+                  .as(
+                      LogicalTypeAnnotation.timestampType(
+                          true, LogicalTypeAnnotation.TimeUnit.MICROS))
+                  .named(fieldName);
+        }
+        if 
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+            == InternalSchema.MetadataValue.MILLIS) {
+          type =
+              Types.required(PrimitiveTypeName.INT64)
+                  .as(
+                      LogicalTypeAnnotation.timestampType(
+                          true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+                  .named(fieldName);
+        } else if 
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+            == InternalSchema.MetadataValue.NANOS) {
+          type =
+              Types.required(PrimitiveTypeName.INT64)
+                  .as(
+                      LogicalTypeAnnotation.timestampType(
+                          true, LogicalTypeAnnotation.TimeUnit.NANOS))
+                  .named(fieldName);
+        }
+        break;
+      case TIMESTAMP_NTZ:
+        if 
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+            == InternalSchema.MetadataValue.MICROS) {
+          type =
+              Types.required(PrimitiveTypeName.INT64)
+                  .as(
+                      LogicalTypeAnnotation.timestampType(
+                          true, LogicalTypeAnnotation.TimeUnit.MICROS))
+                  .named(fieldName);
+
+        } else {
+          type =
+              Types.required(PrimitiveTypeName.INT64)
+                  .as(
+                      LogicalTypeAnnotation.timestampType(
+                          true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+                  .named(fieldName);
+        }
+        break;
+      case LIST:
+        InternalField elementField =
+            internalSchema.getFields().stream()
+                .filter(
+                    field ->
+                        
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(field.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid array 
schema"));
+        listType = fromInternalSchema(elementField.getSchema(), 
elementField.getPath());
+        type = 
Types.requiredList().setElementType(listType).named(internalSchema.getName());
+        // TODO nullable lists
+        break;
+      case MAP:
+        InternalField keyField =
+            internalSchema.getFields().stream()
+                .filter(field -> 
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(field.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
+        InternalField valueField =
+            internalSchema.getFields().stream()
+                .filter(
+                    field -> 
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(field.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
+        mapKeyType = fromInternalSchema(keyField.getSchema(), 
valueField.getPath());
+        mapValueType = fromInternalSchema(valueField.getSchema(), 
valueField.getPath());
+        type =
+            
Types.requiredMap().key(mapKeyType).value(mapValueType).named(internalSchema.getName());
+        // TODO nullable lists
+        break;
+      case RECORD:
+        List<Type> fields =
+            internalSchema.getFields().stream()
+                .map(
+                    field ->
+                        fromInternalSchema(
+                            field.getSchema(),
+                            SchemaUtils.getFullyQualifiedPath(field.getName(), 
currentPath)))
+                
.collect(CustomCollectors.toList(internalSchema.getFields().size()));
+        type =
+            
Types.requiredGroup().addFields(fields.stream().toArray(Type[]::new)).named(fieldName);
+        break;
+      default:
+        throw new UnsupportedSchemaTypeException(
+            "Encountered unhandled type during InternalSchema to parquet 
conversion:"
+                + internalType);
+    }
+    return type;
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
new file mode 100644
index 00000000..8d02b392
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+@Value
+@Builder
+public class ParquetStatsExtractor {
+
+  private static final ParquetStatsExtractor INSTANCE = new 
ParquetStatsExtractor();
+
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  // private static final InputPartitionFields partitions = null;
+
+  public static ParquetStatsExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata 
footer) {
+    return getStatsForFile(footer).values().stream()
+        .flatMap(List::stream)
+        .collect(Collectors.toList());
+  }
+
+  private static Optional<Long> getMaxFromColumnStats(List<ColumnStat> 
columnStats) {
+    return columnStats.stream()
+        .filter(entry -> entry.getField().getParentPath() == null)
+        .map(ColumnStat::getNumValues)
+        .filter(numValues -> numValues > 0)
+        .max(Long::compareTo);
+  }
+
+  public static Map<ColumnDescriptor, List<ColumnStat>> 
getStatsForFile(ParquetMetadata footer) {
+    Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>();
+    MessageType schema = parquetMetadataExtractor.getSchema(footer);
+    List<ColumnChunkMetaData> columns = new ArrayList<>();
+    columns =
+        footer.getBlocks().stream()
+            .flatMap(blockMetaData -> blockMetaData.getColumns().stream())
+            .collect(Collectors.toList());
+    columnDescStats =
+        columns.stream()
+            .collect(
+                Collectors.groupingBy(
+                    columnMetaData ->
+                        
schema.getColumnDescription(columnMetaData.getPath().toArray()),
+                    Collectors.mapping(
+                        columnMetaData ->
+                            ColumnStat.builder()
+                                .field(
+                                    InternalField.builder()
+                                        
.name(columnMetaData.getPrimitiveType().getName())
+                                        .fieldId(
+                                            
columnMetaData.getPrimitiveType().getId() == null
+                                                ? null
+                                                : columnMetaData
+                                                    .getPrimitiveType()
+                                                    .getId()
+                                                    .intValue())
+                                        .parentPath(null)
+                                        .schema(
+                                            schemaExtractor.toInternalSchema(
+                                                
columnMetaData.getPrimitiveType(),
+                                                
columnMetaData.getPath().toDotString()))
+                                        .build())
+                                .numValues(columnMetaData.getValueCount())
+                                .totalSize(columnMetaData.getTotalSize())
+                                .range(
+                                    Range.vector(
+                                        
columnMetaData.getStatistics().genericGetMin(),
+                                        
columnMetaData.getStatistics().genericGetMax()))
+                                .build(),
+                        Collectors.toList())));
+    return columnDescStats;
+  }
+
+  /*  private static InputPartitionFields initPartitionInfo() {
+    return partitions;
+  }*/
+
+  public static InternalDataFile toInternalDataFile(Configuration hadoopConf, 
Path parentPath)
+      throws IOException {
+    FileStatus file = null;
+    List<PartitionValue> partitionValues = null;
+    ParquetMetadata footer = null;
+    List<ColumnStat> columnStatsForAFile = null;
+    try {
+      FileSystem fs = FileSystem.get(hadoopConf);
+      file = fs.getFileStatus(parentPath);
+      // InputPartitionFields partitionInfo = initPartitionInfo();
+      footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parentPath);
+      MessageType schema = parquetMetadataExtractor.getSchema(footer);
+      columnStatsForAFile = getColumnStatsForaFile(footer);
+      // partitionValues = partitionExtractor.createPartitionValues(
+      //               partitionInfo);
+    } catch (java.io.IOException e) {
+
+    }
+    return InternalDataFile.builder()
+        .physicalPath(parentPath.toString())
+        .fileFormat(FileFormat.APACHE_PARQUET)
+        // .partitionValues(partitionValues)
+        .fileSizeBytes(file.getLen())
+        .recordCount(getMaxFromColumnStats(columnStatsForAFile).orElse(0L))
+        .columnStats(columnStatsForAFile)
+        .lastModified(file.getModificationTime())
+        .build();
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
index 252b5b26..89460c40 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
@@ -127,6 +127,7 @@ public abstract class TestAbstractHudiTable
       throw new UncheckedIOException(ex);
     }
   }
+
   // Name of the table
   protected String tableName;
   // Base path for the table
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
index ce3b25bd..abbe7fe6 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
@@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable {
   private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
 
   private final Configuration conf;
+
   /**
    * Create a test table instance for general testing. The table is created 
with the schema defined
    * in basic_schema.avsc which contains many data types to ensure they are 
handled correctly.
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
index a25063c4..9475d503 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
@@ -36,6 +36,7 @@ public class StubCatalog implements Catalog {
   public static void registerMock(String catalogName, Catalog catalog) {
     REGISTERED_MOCKS.put(catalogName, catalog);
   }
+
   // use a mocked catalog instance to more easily test
   private Catalog mockedCatalog;
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
new file mode 100644
index 00000000..13d2299f
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.schema.*;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestParquetSchemaExtractor {
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  @Test
+  public void testPrimitiveTypes() {
+
+    InternalSchema primitive1 =
+        
InternalSchema.builder().name("integer").dataType(InternalType.INT).build();
+    InternalSchema primitive2 =
+        
InternalSchema.builder().name("string").dataType(InternalType.STRING).build();
+
+    Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new 
HashMap<>();
+    fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6);
+    fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5);
+    InternalSchema decimalType =
+        InternalSchema.builder()
+            .name("decimal")
+            .dataType(InternalType.DECIMAL)
+            .isNullable(false)
+            .metadata(fixedDecimalMetadata)
+            .build();
+
+    Type stringPrimitiveType =
+        Types.required(PrimitiveTypeName.BINARY)
+            .as(LogicalTypeAnnotation.stringType())
+            .named("string");
+
+    Type intPrimitiveType =
+        Types.required(PrimitiveTypeName.INT32)
+            .as(LogicalTypeAnnotation.intType(32, false))
+            .named("integer");
+
+    Type decimalPrimitive =
+        Types.required(PrimitiveTypeName.INT32)
+            .as(LogicalTypeAnnotation.decimalType(5, 6))
+            .named("decimal");
+
+    Assertions.assertEquals(primitive1, 
schemaExtractor.toInternalSchema(intPrimitiveType, null));
+
+    Assertions.assertEquals(
+        primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, 
null));
+
+    Assertions.assertEquals(decimalType, 
schemaExtractor.toInternalSchema(decimalPrimitive, null));
+
+    // tests for timestamp and date
+    InternalSchema testDate =
+        
InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build();
+
+    Map<InternalSchema.MetadataKey, Object> millisMetadata =
+        Collections.singletonMap(
+            InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MILLIS);
+    Map<InternalSchema.MetadataKey, Object> microsMetadata =
+        Collections.singletonMap(
+            InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
+    Map<InternalSchema.MetadataKey, Object> nanosMetadata =
+        Collections.singletonMap(
+            InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.NANOS);
+
+    InternalSchema testTimestampMillis =
+        InternalSchema.builder()
+            .name("timestamp_millis")
+            .dataType(InternalType.TIMESTAMP_NTZ)
+            .isNullable(false)
+            .metadata(millisMetadata)
+            .build();
+
+    InternalSchema testTimestampMicros =
+        InternalSchema.builder()
+            .name("timestamp_micros")
+            .dataType(InternalType.TIMESTAMP)
+            .isNullable(false)
+            .metadata(microsMetadata)
+            .build();
+
+    InternalSchema testTimestampNanos =
+        InternalSchema.builder()
+            .name("timestamp_nanos")
+            .dataType(InternalType.TIMESTAMP_NTZ)
+            .isNullable(false)
+            .metadata(nanosMetadata)
+            .build();
+
+    Type timestampMillisPrimitiveType =
+        Types.required(PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(false, 
LogicalTypeAnnotation.TimeUnit.MILLIS))
+            .named("timestamp_millis");
+    Type timestampNanosPrimitiveType =
+        Types.required(PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(false, 
LogicalTypeAnnotation.TimeUnit.NANOS))
+            .named("timestamp_nanos");
+    Assertions.assertEquals(
+        testTimestampMillis, 
schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null));
+    Assertions.assertEquals(
+        testTimestampNanos, 
schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null));
+
+    // test date
+
+    Type datePrimitiveType =
+        
Types.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()).named("date");
+    Assertions.assertEquals(testDate, 
schemaExtractor.toInternalSchema(datePrimitiveType, null));
+  }
+
+  @Test
+  public void testGroupTypes() {
+
+    // map
+
+    InternalSchema internalMap =
+        InternalSchema.builder()
+            .name("map")
+            .isNullable(false)
+            .dataType(InternalType.MAP)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("key")
+                        .parentPath("_one_field_value")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("key")
+                                .dataType(InternalType.FLOAT)
+                                .isNullable(false)
+                                .build())
+                        .defaultValue(null)
+                        .build(),
+                    InternalField.builder()
+                        .name("value")
+                        .parentPath("_one_field_value")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("value")
+                                .dataType(InternalType.INT)
+                                .isNullable(false)
+                                .build())
+                        .build()))
+            .build();
+
+    /* testing from fromInternalSchema()*/
+
+    GroupType fromSimpleList =
+        Types.requiredList()
+            .element(
+                Types.required(PrimitiveTypeName.INT32)
+                    .as(LogicalTypeAnnotation.intType(32, false))
+                    .named(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME))
+            .named("my_list");
+
+    InternalSchema fromInternalList =
+        InternalSchema.builder()
+            .name("my_list")
+            .isNullable(false)
+            .dataType(InternalType.LIST)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+                        .parentPath(null)
+                        .schema(
+                            InternalSchema.builder()
+                                
.name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+                                .dataType(InternalType.INT)
+                                .isNullable(false)
+                                .build())
+                        .build()))
+            .build();
+
+    GroupType fromTestMap =
+        Types.requiredMap()
+            .key(Types.primitive(PrimitiveTypeName.FLOAT, 
Repetition.REQUIRED).named("key"))
+            .value(
+                Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED)
+                    .as(LogicalTypeAnnotation.intType(32, false))
+                    .named("value"))
+            .named("map");
+    InternalSchema fromInternalMap =
+        InternalSchema.builder()
+            .name("map")
+            .isNullable(false)
+            .dataType(InternalType.MAP)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("_one_field_key") // "key")
+                        .parentPath("_one_field_value")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("key")
+                                .dataType(InternalType.FLOAT)
+                                .isNullable(false)
+                                .build())
+                        .defaultValue(null)
+                        .build(),
+                    InternalField.builder()
+                        .name("_one_field_value") // "value")
+                        .parentPath("_one_field_value")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("value")
+                                .dataType(InternalType.INT)
+                                .isNullable(false)
+                                .build())
+                        .build()))
+            .build();
+
+    InternalSchema recordListElementSchema =
+        InternalSchema.builder()
+            .name("my_group")
+            .isNullable(false)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("id")
+                        .parentPath("my_group")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("id")
+                                .dataType(InternalType.LONG)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("name")
+                        .parentPath("my_group")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("name")
+                                .dataType(InternalType.STRING)
+                                .isNullable(true)
+                                .build())
+                        .defaultValue(null)
+                        .build()))
+            .dataType(InternalType.RECORD)
+            .build();
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .name("my_record")
+            .dataType(InternalType.RECORD)
+            .isNullable(true)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("my_list")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("my_list")
+                                .isNullable(false)
+                                .dataType(InternalType.LIST)
+                                .fields(
+                                    Arrays.asList(
+                                        InternalField.builder()
+                                            
.name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+                                            .parentPath("my_list")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("element")
+                                                    .dataType(InternalType.INT)
+                                                    .isNullable(true)
+                                                    .build())
+                                            .build()))
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("my_group")
+                        .schema(recordListElementSchema)
+                        .defaultValue(null)
+                        .build()))
+            .build();
+
+    Assertions.assertEquals(fromTestMap, 
schemaExtractor.fromInternalSchema(fromInternalMap, null));
+    Assertions.assertEquals(
+        fromSimpleList, schemaExtractor.fromInternalSchema(fromInternalList, 
null));
+
+    GroupType testGroupType =
+        Types.requiredGroup()
+            .required(PrimitiveTypeName.INT64)
+            .named("id")
+            .optional(PrimitiveTypeName.BINARY)
+            .as(LogicalTypeAnnotation.stringType())
+            .named("name")
+            .named("my_group");
+
+    GroupType testMap =
+        Types.requiredMap()
+            .key(Types.primitive(PrimitiveTypeName.FLOAT, 
Repetition.REQUIRED).named("key"))
+            .value(Types.primitive(PrimitiveTypeName.INT32, 
Repetition.REQUIRED).named("value"))
+            .named("map");
+    GroupType listType =
+        Types.requiredList()
+            .setElementType(
+                Types.primitive(PrimitiveTypeName.INT32, 
Repetition.REQUIRED).named("element"))
+            .named("my_list");
+    MessageType messageType =
+        Types.buildMessage()
+            // .addField(testMap)
+            .addField(listType)
+            .addField(testGroupType)
+            .named("my_record");
+
+    Assertions.assertEquals(internalMap, 
schemaExtractor.toInternalSchema(testMap, null));
+    Assertions.assertEquals(internalSchema, 
schemaExtractor.toInternalSchema(messageType, null));
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
new file mode 100644
index 00000000..7522c292
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.PLAIN;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.*;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class TestParquetStatsExtractor {
+
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  @TempDir static java.nio.file.Path tempDir = Paths.get("./");
+
+  public static List<ColumnStat> initBooleanFileTest(File file) throws 
IOException {
+    // create the parquet file by parsing a schema
+    Path path = new Path(file.toURI());
+    Configuration configuration = new Configuration();
+
+    MessageType schema =
+        MessageTypeParser.parseMessageType("message m { required group a 
{required boolean b;}}");
+    String[] columnPath = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+    BooleanStatistics stats = new BooleanStatistics();
+    stats.updateStats(true);
+    stats.updateStats(false);
+
+    // write the string columned file
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(c1, 5, codec);
+    w.writeDataPage(2, 4, BytesInput.fromInt(1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.fromInt(0), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(c1, 8, codec);
+
+    w.writeDataPage(7, 4, BytesInput.fromInt(0), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    // reconstruct the stats for the InternalDataFile testing object
+
+    boolean minStat = stats.genericGetMin();
+
+    boolean maxStat = stats.genericGetMax();
+    PrimitiveType primitiveType =
+        new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BOOLEAN, "b");
+    List<Integer> col1NumValTotSize =
+        new ArrayList<>(Arrays.asList(5, 8)); // (5, 8)// start column indexes
+    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
+    List<ColumnStat> testColumnStats = new ArrayList<>();
+    String[] columnDotPath = {"a.b", "a.b"};
+    for (int i = 0; i < columnDotPath.length; i++) {
+      testColumnStats.add(
+          ColumnStat.builder()
+              .field(
+                  InternalField.builder()
+                      .name(primitiveType.getName())
+                      .parentPath(null)
+                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
+                      .build())
+              .numValues(col1NumValTotSize.get(i))
+              .totalSize(col2NumValTotSize.get(i))
+              .range(Range.vector(minStat, maxStat))
+              .build());
+    }
+
+    return testColumnStats;
+  }
+
+  public static List<ColumnStat> initStringFileTest(File file) throws 
IOException {
+    // create the parquet file by parsing a schema
+    Path path = new Path(file.toURI());
+    Configuration configuration = new Configuration();
+
+    MessageType schema =
+        MessageTypeParser.parseMessageType(
+            "message m { required group a {required fixed_len_byte_array(10) 
b;}}");
+    String[] columnPath = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+    BinaryStatistics stats = new BinaryStatistics();
+    stats.updateStats(Binary.fromString("1"));
+    stats.updateStats(Binary.fromString("2"));
+    stats.updateStats(Binary.fromString("5"));
+
+    byte[] bytes1 = "First string".getBytes();
+    byte[] bytes2 = "Second string".getBytes();
+
+    // write the string columned file
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(c1, 5, codec);
+
+    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(c1, 8, codec);
+
+    w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    // reconstruct the stats for the InternalDataFile testing object
+    BinaryStatistics stats_clone = new BinaryStatistics();
+    Binary originalBinary1 = Binary.fromString("1");
+    Binary originalBinary2 = Binary.fromString("2");
+    Binary originalBinary5 = Binary.fromString("5");
+    stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
+    stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
+    stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
+
+    Binary minStat = stats_clone.genericGetMin();
+
+    Binary maxStat = stats_clone.genericGetMax();
+    PrimitiveType primitiveType =
+        new PrimitiveType(Repetition.REQUIRED, 
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, "b");
+    List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 8));
+    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(71, 36));
+    List<ColumnStat> testColumnStats = new ArrayList<>();
+    String[] columnDotPath = {"a.b", "a.b"};
+    for (int i = 0; i < columnDotPath.length; i++) {
+      testColumnStats.add(
+          ColumnStat.builder()
+              .field(
+                  InternalField.builder()
+                      .name(primitiveType.getName())
+                      .parentPath(null)
+                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
+                      .build())
+              .numValues(col1NumValTotSize.get(i))
+              .totalSize(col2NumValTotSize.get(i))
+              .range(Range.vector(minStat, maxStat))
+              .build());
+    }
+
+    return testColumnStats;
+  }
+
+  public static List<ColumnStat> initBinaryFileTest(File file) throws 
IOException {
+    // create the parquet file by parsing a schema
+    Path path = new Path(file.toURI());
+    Configuration configuration = new Configuration();
+
+    MessageType schema =
+        MessageTypeParser.parseMessageType("message m { required group a 
{required binary b;}}");
+    String[] columnPath = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+
+    byte[] bytes1 = {0, 1, 2, 3};
+    byte[] bytes2 = {2, 3, 4, 5};
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+    BinaryStatistics stats = new BinaryStatistics();
+    stats.updateStats(Binary.fromString("1"));
+    stats.updateStats(Binary.fromString("2"));
+    stats.updateStats(Binary.fromString("5"));
+
+    // to simplify the test we keep the same stats for both columns
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(c1, 5, codec);
+
+    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+
+    w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+
+    w.startColumn(c1, 1, codec);
+    w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    // reconstruct the stats for the InternalDataFile testing object
+    BinaryStatistics stats_clone = new BinaryStatistics();
+    Binary originalBinary1 = Binary.fromString("1");
+    Binary originalBinary2 = Binary.fromString("2");
+    Binary originalBinary5 = Binary.fromString("5");
+    stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
+    stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
+    stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
+
+    Binary minStat = stats_clone.genericGetMin();
+    Binary maxStat = stats_clone.genericGetMax();
+    PrimitiveType primitiveType =
+        new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "b");
+    List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 1));
+    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
+    List<ColumnStat> testColumnStats = new ArrayList<>();
+    String[] columnDotPath = {"a.b", "a.b"};
+    for (int i = 0; i < columnDotPath.length; i++) {
+      testColumnStats.add(
+          ColumnStat.builder()
+              .field(
+                  InternalField.builder()
+                      .name(primitiveType.getName())
+                      .parentPath(null)
+                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
+                      .build())
+              .numValues(col1NumValTotSize.get(i))
+              .totalSize(col2NumValTotSize.get(i))
+              .range(Range.vector(minStat, maxStat))
+              .build());
+    }
+
+    return testColumnStats;
+  }
+
+  public static List<ColumnStat> initIntFileTest(File file) throws IOException 
{
+    // create the parquet file by parsing a schema
+    Path path = new Path(file.toURI());
+    Configuration configuration = new Configuration();
+
+    MessageType schema =
+        MessageTypeParser.parseMessageType("message m { required group a 
{required int32 b;}}");
+    String[] columnPath = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+
+    IntStatistics stats = new IntStatistics();
+    stats.updateStats(1);
+    stats.updateStats(2);
+    stats.updateStats(5);
+
+    // to simplify the test we keep the same stats for both columns
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+
+    w.startColumn(c1, 2, codec);
+
+    w.writeDataPage(3, 3, BytesInput.fromInt(3), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+
+    w.writeDataPage(3, 3, BytesInput.fromInt(2), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+
+    w.startColumn(c1, 1, codec);
+
+    w.writeDataPage(3, 3, BytesInput.fromInt(1), stats, BIT_PACKED, 
BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    // reconstruct the stats for the InternalDataFile testing object
+
+    java.lang.Integer minStat = stats.genericGetMin();
+    java.lang.Integer maxStat = stats.genericGetMax();
+    PrimitiveType primitiveType =
+        new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.INT32, "b");
+    List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(2, 1));
+    List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
+    List<ColumnStat> testColumnStats = new ArrayList<>();
+    String[] columnDotPath = {"a.b", "a.b"};
+    for (int i = 0; i < columnDotPath.length; i++) {
+      testColumnStats.add(
+          ColumnStat.builder()
+              .field(
+                  InternalField.builder()
+                      .name(primitiveType.getName())
+                      .parentPath(null)
+                      .schema(schemaExtractor.toInternalSchema(primitiveType, 
columnDotPath[i]))
+                      .build())
+              .numValues(col1NumValTotSize.get(i))
+              .totalSize(col2NumValTotSize.get(i))
+              .range(Range.vector(minStat, maxStat))
+              .build());
+    }
+
+    return testColumnStats;
+  }
+
+  @Test
+  public void testInternalDataFileStringStat() throws IOException {
+
+    Configuration configuration = new Configuration();
+
+    java.nio.file.Path path = tempDir.resolve("parquet-test-string-file");
+    File file = path.toFile();
+    file.deleteOnExit();
+    List<ColumnStat> testColumnStats = initStringFileTest(file);
+    Path hadoopPath = new Path(file.toURI());
+    // statsExtractor toInternalDataFile testing
+    InternalDataFile internalDataFile =
+        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+    InternalDataFile testInternalFile =
+        InternalDataFile.builder()
+            .physicalPath(
+                "file:"
+                    .concat(
+                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+            .columnStats(testColumnStats)
+            .fileFormat(FileFormat.APACHE_PARQUET)
+            .lastModified(file.lastModified())
+            .fileSizeBytes(file.length())
+            .recordCount(8)
+            .build();
+
+    Assertions.assertEquals(testInternalFile, internalDataFile);
+  }
+
+  @Test
+  public void testInternalDataFileBinaryStat() throws IOException {
+
+    Configuration configuration = new Configuration();
+
+    java.nio.file.Path path = tempDir.resolve("parquet-test-binary-file");
+    File file = path.toFile();
+    file.deleteOnExit();
+    List<ColumnStat> testColumnStats = initBinaryFileTest(file);
+    Path hadoopPath = new Path(file.toURI());
+    // statsExtractor toInternalDataFile testing
+    InternalDataFile internalDataFile =
+        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+    InternalDataFile testInternalFile =
+        InternalDataFile.builder()
+            .physicalPath(
+                "file:"
+                    .concat(
+                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+            .columnStats(testColumnStats)
+            .fileFormat(FileFormat.APACHE_PARQUET)
+            .lastModified(file.lastModified())
+            .fileSizeBytes(file.length())
+            .recordCount(5)
+            .build();
+
+    Assertions.assertEquals(testInternalFile, internalDataFile);
+  }
+
+  @Test
+  public void testInternalDataFileIntStat() throws IOException {
+
+    Configuration configuration = new Configuration();
+    java.nio.file.Path path = tempDir.resolve("parquet-test-int-file");
+    File file = path.toFile();
+    file.deleteOnExit();
+    List<ColumnStat> testColumnStats = initIntFileTest(file);
+    Path hadoopPath = new Path(file.toURI());
+    // statsExtractor toInternalDataFile testing
+    InternalDataFile internalDataFile =
+        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+    InternalDataFile testInternalFile =
+        InternalDataFile.builder()
+            .physicalPath(
+                "file:"
+                    .concat(
+                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+            .columnStats(testColumnStats)
+            .fileFormat(FileFormat.APACHE_PARQUET)
+            .lastModified(file.lastModified())
+            .fileSizeBytes(file.length())
+            .recordCount(2)
+            .build();
+
+    Assertions.assertEquals(testInternalFile, internalDataFile);
+  }
+
+  @Test
+  public void testInternalDataFileBooleanStat() throws IOException {
+    Configuration configuration = new Configuration();
+
+    java.nio.file.Path path = tempDir.resolve("parquet-test-boolean-file");
+    File file = path.toFile();
+    file.deleteOnExit();
+
+    List<ColumnStat> testColumnStats = initBooleanFileTest(file);
+    Path hadoopPath = new Path(file.toURI());
+    // statsExtractor toInternalDataFile testing
+    InternalDataFile internalDataFile =
+        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+    InternalDataFile testInternalFile =
+        InternalDataFile.builder()
+            .physicalPath(
+                "file:"
+                    .concat(
+                        
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+            .columnStats(testColumnStats)
+            .fileFormat(FileFormat.APACHE_PARQUET)
+            .lastModified(file.lastModified())
+            .fileSizeBytes(file.length())
+            .recordCount(8)
+            .build();
+
+    Assertions.assertEquals(testInternalFile, internalDataFile);
+  }
+}
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 43549d1b..e04d6985 100644
--- 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++ 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -297,11 +297,13 @@ public class RunCatalogSync {
      * necessary connection and access details for describing and listing 
tables
      */
     ExternalCatalogConfig sourceCatalog;
+
     /**
      * Defines configuration one or more target catalogs, to which XTable will 
write or update
      * tables. Unlike the source, these catalogs must be writable
      */
     List<ExternalCatalogConfig> targetCatalogs;
+
     /** A list of datasets that specify how a source table maps to one or more 
target tables. */
     List<Dataset> datasets;
 
@@ -314,6 +316,7 @@ public class RunCatalogSync {
     public static class Dataset {
       /** Identifies the source table in sourceCatalog. */
       SourceTableIdentifier sourceCatalogTableIdentifier;
+
       /** A list of one or more targets that this source table should be 
written to. */
       List<TargetTableIdentifier> targetCatalogTableIdentifiers;
     }
@@ -324,6 +327,7 @@ public class RunCatalogSync {
     public static class SourceTableIdentifier {
       /** Specifies the table identifier in the source catalog. */
       TableIdentifier tableIdentifier;
+
       /**
        * (Optional) Provides direct storage details such as a table’s base 
path (like an S3
        * location) and the partition specification. This allows reading from a 
source even if it is
@@ -341,11 +345,13 @@ public class RunCatalogSync {
        * updated
        */
       String catalogId;
+
       /**
        * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how 
the data will be
        * stored at the target.
        */
       String tableFormat;
+
       /** Specifies the table identifier in the target catalog. */
       TableIdentifier tableIdentifier;
     }
@@ -359,6 +365,7 @@ public class RunCatalogSync {
        * HierarchicalTableIdentifier}
        */
       String hierarchicalId;
+
       /** Specifies the partition spec of the table */
       String partitionSpec;
     }

Reply via email to