This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a1c5b1a [755] Support column stats for paimon (#767)
7a1c5b1a is described below

commit 7a1c5b1a1b1c834ff088465b08333f289b1d7cfa
Author: Mao <[email protected]>
AuthorDate: Tue Jan 20 05:54:05 2026 +1100

    [755] Support column stats for paimon (#767)
    
    * paimon: wip column stats
    
    * paimon column stats: fix tests
    
    * paimon: fixing some tests
    
    * paimon: fixing more tests
    
    * paimon: fix column stats column checks
    
    * paimon: stats: handling timestamp as long
    
    * paimon: add to xtable-utilities
    
    * paimon: lint
    
    * paimon: ticked version to 1.3.1
    
    * paimon: reducing logging
    
    * paimon: annotating assumptions with comments
    
    * paimon: expanded tests for stats extractor
    
    * paimon: stats extractor to its own class
    
    * paimon: handling deleted stats
    
    * paimon: spotless
    
    * paimon: fix tests
    
    * paimon: tests remove wildcard imports
    
    * paimon: assumptions validated
    
    * paimon: code review changes
    
    * paimon: added test case for lack of stats for complex fields
    
    * paimon: extended tests and updated test TODO with github issue reference
    
    * paimon: fix tests
    
    * pin catalyst version
    
    * paimon: fix tests in xtable-utilities
    
    ---------
    
    Co-authored-by: Timothy Brown <[email protected]>
---
 pom.xml                                            |   8 +-
 .../xtable/paimon/PaimonDataFileExtractor.java     |  13 +-
 .../apache/xtable/paimon/PaimonStatsExtractor.java | 187 +++++++
 .../java/org/apache/xtable/TestPaimonTable.java    |  64 ++-
 .../xtable/paimon/TestPaimonConversionSource.java  |  10 +-
 .../xtable/paimon/TestPaimonDataFileExtractor.java |  85 ++--
 .../xtable/paimon/TestPaimonStatsExtractor.java    | 539 +++++++++++++++++++++
 .../main/resources/xtable-conversion-defaults.yaml |   4 +-
 .../org/apache/xtable/utilities/TestRunSync.java   |   9 +-
 9 files changed, 836 insertions(+), 83 deletions(-)

diff --git a/pom.xml b/pom.xml
index 855894ec..92845db6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
         <spark.version.prefix>3.4</spark.version.prefix>
         <iceberg.version>1.4.2</iceberg.version>
         <delta.version>2.4.0</delta.version>
-        <paimon.version>1.2.0</paimon.version>
+        <paimon.version>1.3.1</paimon.version>
         <jackson.version>2.18.2</jackson.version>
         <spotless.version>2.43.0</spotless.version>
         <apache.rat.version>0.16.1</apache.rat.version>
@@ -374,6 +374,12 @@
                 <version>${spark.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+                <version>${spark.version}</version>
+                <scope>provided</scope>
+            </dependency>
 
             <dependency>
                 <groupId>commons-cli</groupId>
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
index 4555b0cf..e452c85b 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
@@ -19,7 +19,6 @@
 package org.apache.xtable.paimon;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -29,7 +28,6 @@ import java.util.Set;
 import lombok.extern.log4j.Log4j2;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
@@ -39,7 +37,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 
 import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.stat.ColumnStat;
 import org.apache.xtable.model.storage.InternalDataFile;
 import org.apache.xtable.model.storage.InternalFilesDiff;
 
@@ -49,6 +46,8 @@ public class PaimonDataFileExtractor {
   private final PaimonPartitionExtractor partitionExtractor =
       PaimonPartitionExtractor.getInstance();
 
+  private final PaimonStatsExtractor statsExtractor = 
PaimonStatsExtractor.getInstance();
+
   private static final PaimonDataFileExtractor INSTANCE = new 
PaimonDataFileExtractor();
 
   public static PaimonDataFileExtractor getInstance() {
@@ -84,7 +83,7 @@ public class PaimonDataFileExtractor {
         .recordCount(entry.file().rowCount())
         .partitionValues(
             partitionExtractor.toPartitionValues(table, entry.partition(), 
internalSchema))
-        .columnStats(toColumnStats(entry.file()))
+        .columnStats(statsExtractor.extractColumnStats(entry.file(), 
internalSchema))
         .build();
   }
 
@@ -101,12 +100,6 @@ public class PaimonDataFileExtractor {
     }
   }
 
-  private List<ColumnStat> toColumnStats(DataFileMeta file) {
-    // TODO: Implement logic to extract column stats from the file meta
-    // https://github.com/apache/incubator-xtable/issues/755
-    return Collections.emptyList();
-  }
-
   /**
    * Extracts file changes (added and removed files) from delta manifests for 
a given snapshot. This
    * method reads only the delta manifests which contain the changes 
introduced in this specific
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonStatsExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonStatsExtractor.java
new file mode 100644
index 00000000..092061db
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonStatsExtractor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.types.TimestampType;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.Range;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonStatsExtractor {
+  private static final PaimonStatsExtractor INSTANCE = new 
PaimonStatsExtractor();
+
+  public static PaimonStatsExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<ColumnStat> extractColumnStats(DataFileMeta file, InternalSchema 
internalSchema) {
+    List<ColumnStat> columnStats = new ArrayList<>();
+    Map<String, InternalField> fieldMap =
+        internalSchema.getAllFields().stream()
+            .collect(Collectors.toMap(InternalField::getPath, f -> f));
+
+    // stats for all columns are present in valueStats, we can safely ignore 
file.keyStats()
+    SimpleStats valueStats = file.valueStats();
+    if (valueStats != null) {
+      List<String> colNames = file.valueStatsCols();
+      if (colNames == null) {
+        // if column names are not present, then stats are being collected for 
all columns
+        colNames =
+            internalSchema.getAllFields().stream()
+                .map(InternalField::getPath)
+                .collect(Collectors.toList());
+      }
+
+      if (colNames.size() != valueStats.minValues().getFieldCount()) {
+        // paranoia check - this should never happen, but if the code reaches 
here, then there is a
+        // bug! Please file a bug report
+        throw new ReadException(
+            String.format(
+                "Mismatch between column stats names and values arity: 
names=%d, values=%d",
+                colNames.size(), valueStats.minValues().getFieldCount()));
+      }
+
+      collectColumnStats(columnStats, valueStats, colNames, fieldMap, 
file.rowCount());
+    }
+
+    return columnStats;
+  }
+
+  private void collectColumnStats(
+      List<ColumnStat> columnStats,
+      SimpleStats stats,
+      List<String> colNames,
+      Map<String, InternalField> fieldMap,
+      long rowCount) {
+
+    BinaryRow minValues = stats.minValues();
+    BinaryRow maxValues = stats.maxValues();
+    BinaryArray nullCounts = stats.nullCounts();
+
+    for (int i = 0; i < colNames.size(); i++) {
+      String colName = colNames.get(i);
+      InternalField field = fieldMap.get(colName);
+      if (field == null) {
+        continue;
+      }
+
+      // Check if we already have stats for this field
+      boolean alreadyExists =
+          columnStats.stream().anyMatch(cs -> 
cs.getField().getPath().equals(colName));
+      if (alreadyExists) {
+        continue;
+      }
+
+      InternalType type = field.getSchema().getDataType();
+      Object min = getValue(minValues, i, type, field.getSchema());
+      Object max = getValue(maxValues, i, type, field.getSchema());
+      long nullCount = (nullCounts != null && i < nullCounts.size()) ? 
nullCounts.getLong(i) : 0L;
+
+      columnStats.add(
+          ColumnStat.builder()
+              .field(field)
+              .range(min != null && max != null ? Range.vector(min, max) : 
null)
+              .numNulls(nullCount)
+              .numValues(rowCount)
+              .build());
+    }
+  }
+
+  private Object getValue(BinaryRow row, int index, InternalType type, 
InternalSchema fieldSchema) {
+    if (row.isNullAt(index)) {
+      return null;
+    }
+    switch (type) {
+      case BOOLEAN:
+        return row.getBoolean(index);
+      case INT:
+      case DATE:
+        return row.getInt(index);
+      case LONG:
+        return row.getLong(index);
+      case TIMESTAMP:
+      case TIMESTAMP_NTZ:
+        int tsPrecision;
+        InternalSchema.MetadataValue tsPrecisionEnum =
+            (InternalSchema.MetadataValue)
+                
fieldSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION);
+        if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) {
+          tsPrecision = 3;
+        } else if (tsPrecisionEnum == InternalSchema.MetadataValue.MICROS) {
+          tsPrecision = 6;
+        } else if (tsPrecisionEnum == InternalSchema.MetadataValue.NANOS) {
+          tsPrecision = 9;
+        } else {
+          log.warn(
+              "Field idx={}, name={} does not have 
MetadataKey.TIMESTAMP_PRECISION set, defaulting to default precision",
+              index,
+              fieldSchema.getName());
+          tsPrecision = TimestampType.DEFAULT_PRECISION;
+        }
+        Timestamp ts = row.getTimestamp(index, tsPrecision);
+
+        // according to docs for org.apache.xtable.model.stat.Range, timestamp 
is stored as millis
+        // or micros - even if precision is higher than micros, return micros
+        if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) {
+          return ts.getMillisecond();
+        } else {
+          return ts.toMicros();
+        }
+      case FLOAT:
+        return row.getFloat(index);
+      case DOUBLE:
+        return row.getDouble(index);
+      case STRING:
+      case ENUM:
+        return row.getString(index).toString();
+      case DECIMAL:
+        int precision =
+            (int) 
fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
+        int scale = (int) 
fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
+        return row.getDecimal(index, precision, scale).toBigDecimal();
+      default:
+        log.warn(
+            "Handling of {}-type stats for column idx={}, name={} is not yet 
implemented, skipping stats for this column",
+            type,
+            index,
+            fieldSchema.getName());
+        return null;
+    }
+  }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
index 55102007..1fb9314c 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
@@ -70,9 +70,21 @@ public class TestPaimonTable implements 
GenericTable<GenericRow, String> {
       Path tempDir,
       Configuration hadoopConf,
       boolean additionalColumns) {
+
+    Schema schema = buildGenericSchema(partitionField, additionalColumns);
+    return createTable(tableName, partitionField, tempDir, hadoopConf, 
additionalColumns, schema);
+  }
+
+  public static GenericTable<GenericRow, String> createTable(
+      String tableName,
+      String partitionField,
+      Path tempDir,
+      Configuration hadoopConf,
+      boolean additionalColumns,
+      Schema schema) {
     String basePath = initBasePath(tempDir, tableName);
     Catalog catalog = createFilesystemCatalog(basePath, hadoopConf);
-    FileStoreTable paimonTable = createTable(catalog, partitionField, 
additionalColumns);
+    FileStoreTable paimonTable = createTable(catalog, tableName, schema);
 
     System.out.println(
         "Initialized Paimon test table at base path: "
@@ -90,12 +102,10 @@ public class TestPaimonTable implements 
GenericTable<GenericRow, String> {
     return CatalogFactory.createCatalog(context);
   }
 
-  public static FileStoreTable createTable(
-      Catalog catalog, String partitionField, boolean additionalColumns) {
+  public static FileStoreTable createTable(Catalog catalog, String tableName, 
Schema schema) {
     try {
       catalog.createDatabase("test_db", true);
-      Identifier identifier = Identifier.create("test_db", "test_table");
-      Schema schema = buildSchema(partitionField, additionalColumns);
+      Identifier identifier = Identifier.create("test_db", tableName);
       catalog.createTable(identifier, schema, true);
       return (FileStoreTable) catalog.getTable(identifier);
     } catch (Exception e) {
@@ -103,7 +113,7 @@ public class TestPaimonTable implements 
GenericTable<GenericRow, String> {
     }
   }
 
-  private static Schema buildSchema(String partitionField, boolean 
additionalColumns) {
+  private static Schema buildGenericSchema(String partitionField, boolean 
additionalColumns) {
     Schema.Builder builder =
         Schema.newBuilder()
             .primaryKey("id")
@@ -116,7 +126,8 @@ public class TestPaimonTable implements 
GenericTable<GenericRow, String> {
             .column("description", DataTypes.VARCHAR(255))
             .option("bucket", "1")
             .option("bucket-key", "id")
-            .option("full-compaction.delta-commits", "1");
+            .option("full-compaction.delta-commits", "1")
+            .option("metadata.stats-mode", "full");
 
     if (partitionField != null) {
       builder
@@ -178,20 +189,12 @@ public class TestPaimonTable implements 
GenericTable<GenericRow, String> {
   }
 
   private List<GenericRow> insertRecordsToPartition(int numRows, String 
partitionValue) {
-    BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
-    try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
-      List<GenericRow> rows = new ArrayList<>(numRows);
-      for (int i = 0; i < numRows; i++) {
-        GenericRow row = buildGenericRow(i, paimonTable.schema(), 
partitionValue);
-        writer.write(row);
-        rows.add(row);
-      }
-      commitWrites(batchWriteBuilder, writer);
-      compactTable();
-      return rows;
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to insert rows into Paimon table", e);
+    List<GenericRow> rows = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; i++) {
+      rows.add(buildGenericRow(i, paimonTable.schema(), partitionValue));
     }
+    writeRows(paimonTable, rows);
+    return rows;
   }
 
   @Override
@@ -224,8 +227,12 @@ public class TestPaimonTable implements 
GenericTable<GenericRow, String> {
   }
 
   private void compactTable() {
-    BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
-    SnapshotReader snapshotReader = paimonTable.newSnapshotReader();
+    compactTable(paimonTable);
+  }
+
+  public static void compactTable(FileStoreTable table) {
+    BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+    SnapshotReader snapshotReader = table.newSnapshotReader();
     try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
       for (BucketEntry bucketEntry : snapshotReader.bucketEntries()) {
         writer.compact(bucketEntry.partition(), bucketEntry.bucket(), true);
@@ -236,6 +243,19 @@ public class TestPaimonTable implements 
GenericTable<GenericRow, String> {
     }
   }
 
+  public static void writeRows(FileStoreTable table, List<GenericRow> rows) {
+    BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+    try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+      for (GenericRow row : rows) {
+        writer.write(row);
+      }
+      commitWrites(batchWriteBuilder, writer);
+      compactTable(table);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to write rows into Paimon table", e);
+    }
+  }
+
   private static void commitWrites(BatchWriteBuilder batchWriteBuilder, 
BatchTableWrite writer)
       throws Exception {
     BatchTableCommit commit = batchWriteBuilder.newCommit();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
index 5e28e010..71b9022a 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
@@ -18,7 +18,13 @@
  
 package org.apache.xtable.paimon;
 
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
 import java.time.Instant;
@@ -100,7 +106,7 @@ public class TestPaimonConversionSource {
     InternalTable result = unpartitionedSource.getTable(snapshot);
 
     assertNotNull(result);
-    assertEquals("test_table", result.getName());
+    assertEquals("unpartitioned_table", result.getName());
     assertEquals(TableFormat.PAIMON, result.getTableFormat());
     assertNotNull(result.getReadSchema());
     assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, 
result.getLayoutStrategy());
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
index 0f3ed30d..cd528361 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
@@ -24,8 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
-import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.paimon.Snapshot;
@@ -34,30 +34,30 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.xtable.TestPaimonTable;
-import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.model.storage.InternalDataFile;
 import org.apache.xtable.model.storage.InternalFilesDiff;
 
 public class TestPaimonDataFileExtractor {
   private static final PaimonDataFileExtractor extractor = 
PaimonDataFileExtractor.getInstance();
+  private static final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
 
   @TempDir private Path tempDir;
   private TestPaimonTable testTable;
   private FileStoreTable paimonTable;
-  private InternalSchema testSchema;
 
   @Test
   void testToInternalDataFilesWithUnpartitionedTable() {
     createUnpartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
+    assertEquals(1, schema.getRecordKeyFields().size());
 
     // Insert some data to create files
     testTable.insertRows(5);
 
     List<InternalDataFile> result =
         extractor.toInternalDataFiles(
-            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
schema);
 
     assertNotNull(result);
     assertFalse(result.isEmpty());
@@ -68,18 +68,26 @@ public class TestPaimonDataFileExtractor {
     assertTrue(dataFile.getFileSizeBytes() > 0);
     assertEquals(5, dataFile.getRecordCount());
     assertEquals(0, dataFile.getPartitionValues().size());
+    // check all fields have stats, and stats values (min->max range) are not 
null
+    assertEquals(
+        schema.getFields().size(),
+        dataFile.getColumnStats().stream()
+            .filter(stat -> stat.getRange() != null)
+            .collect(Collectors.toList())
+            .size());
   }
 
   @Test
   void testToInternalDataFilesWithPartitionedTable() {
     createPartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
 
     // Insert some data to create files
     testTable.insertRows(5);
 
     List<InternalDataFile> result =
         extractor.toInternalDataFiles(
-            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
schema);
 
     assertNotNull(result);
     assertFalse(result.isEmpty());
@@ -90,11 +98,19 @@ public class TestPaimonDataFileExtractor {
     assertTrue(dataFile.getFileSizeBytes() > 0);
     assertEquals(5, dataFile.getRecordCount());
     assertNotNull(dataFile.getPartitionValues());
+    // check all fields have stats, and stats values (min->max range) are not 
null
+    assertEquals(
+        schema.getFields().size(),
+        dataFile.getColumnStats().stream()
+            .filter(stat -> stat.getRange() != null)
+            .collect(Collectors.toList())
+            .size());
   }
 
   @Test
   void testToInternalDataFilesWithTableWithPrimaryKeys() {
     createTableWithPrimaryKeys();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
 
     // Insert some data to create files
     testTable.insertRows(5);
@@ -102,7 +118,7 @@ public class TestPaimonDataFileExtractor {
     // Get the latest snapshot
     List<InternalDataFile> result =
         extractor.toInternalDataFiles(
-            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
schema);
 
     assertNotNull(result);
     assertFalse(result.isEmpty());
@@ -111,18 +127,26 @@ public class TestPaimonDataFileExtractor {
     assertNotNull(dataFile.getPhysicalPath());
     assertTrue(dataFile.getFileSizeBytes() > 0);
     assertEquals(5, dataFile.getRecordCount());
+    // check all fields have stats, and stats values (min->max range) are not 
null
+    assertEquals(
+        schema.getFields().size(),
+        dataFile.getColumnStats().stream()
+            .filter(stat -> stat.getRange() != null)
+            .collect(Collectors.toList())
+            .size());
   }
 
   @Test
   void testPhysicalPathFormat() {
     createUnpartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
 
     // Insert data
     testTable.insertRows(2);
 
     List<InternalDataFile> result =
         extractor.toInternalDataFiles(
-            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
schema);
 
     assertFalse(result.isEmpty());
 
@@ -133,25 +157,10 @@ public class TestPaimonDataFileExtractor {
     }
   }
 
-  @Test
-  void testColumnStatsAreEmpty() {
-    createUnpartitionedTable();
-
-    testTable.insertRows(1);
-
-    List<InternalDataFile> result =
-        extractor.toInternalDataFiles(
-            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
-
-    assertFalse(result.isEmpty());
-    for (InternalDataFile dataFile : result) {
-      assertEquals(0, dataFile.getColumnStats().size());
-    }
-  }
-
   @Test
   void testExtractFilesDiffWithNewFiles() {
     createUnpartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
 
     // Insert initial data
     testTable.insertRows(5);
@@ -163,8 +172,7 @@ public class TestPaimonDataFileExtractor {
     Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
     assertNotNull(secondSnapshot);
 
-    InternalFilesDiff filesDiff =
-        extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+    InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable, 
secondSnapshot, schema);
 
     // Verify we have replaced the single file on this setup
     assertNotNull(filesDiff);
@@ -181,6 +189,7 @@ public class TestPaimonDataFileExtractor {
   @Test
   void testExtractFilesDiffWithPartitionedTable() {
     createPartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
 
     // Insert initial data
     testTable.insertRows(5);
@@ -192,8 +201,7 @@ public class TestPaimonDataFileExtractor {
     Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
     assertNotNull(secondSnapshot);
 
-    InternalFilesDiff filesDiff =
-        extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+    InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable, 
secondSnapshot, schema);
 
     // Verify we have added files with partition values
     assertNotNull(filesDiff);
@@ -207,6 +215,7 @@ public class TestPaimonDataFileExtractor {
   @Test
   void testExtractFilesDiffWithTableWithPrimaryKeys() {
     createTableWithPrimaryKeys();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
 
     // Insert initial data
     testTable.insertRows(5);
@@ -218,8 +227,7 @@ public class TestPaimonDataFileExtractor {
     Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
     assertNotNull(secondSnapshot);
 
-    InternalFilesDiff filesDiff =
-        extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+    InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable, 
secondSnapshot, schema);
 
     // Verify the diff is returned (size may vary based on compaction)
     assertNotNull(filesDiff);
@@ -230,14 +238,14 @@ public class TestPaimonDataFileExtractor {
   @Test
   void testExtractFilesDiffForFirstSnapshot() {
     createUnpartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
 
     // Insert data to create first snapshot
     testTable.insertRows(5);
     Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
     assertNotNull(firstSnapshot);
 
-    InternalFilesDiff filesDiff =
-        extractor.extractFilesDiff(paimonTable, firstSnapshot, testSchema);
+    InternalFilesDiff filesDiff = extractor.extractFilesDiff(paimonTable, 
firstSnapshot, schema);
 
     // First snapshot should only have added files
     assertNotNull(filesDiff);
@@ -250,8 +258,6 @@ public class TestPaimonDataFileExtractor {
         (TestPaimonTable)
             TestPaimonTable.createTable("test_table", null, tempDir, new 
Configuration(), false);
     paimonTable = testTable.getPaimonTable();
-    testSchema =
-        InternalSchema.builder().build(); // empty schema won't matter for 
non-partitioned tables
   }
 
   private void createPartitionedTable() {
@@ -259,15 +265,6 @@ public class TestPaimonDataFileExtractor {
         (TestPaimonTable)
             TestPaimonTable.createTable("test_table", "level", tempDir, new 
Configuration(), false);
     paimonTable = testTable.getPaimonTable();
-
-    // just the partition field matters for this test
-    InternalField partitionField =
-        InternalField.builder()
-            .name("level")
-            
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
-            .build();
-
-    testSchema = 
InternalSchema.builder().fields(Collections.singletonList(partitionField)).build();
   }
 
   private void createTableWithPrimaryKeys() {
@@ -275,7 +272,5 @@ public class TestPaimonDataFileExtractor {
         (TestPaimonTable)
             TestPaimonTable.createTable("test_table", null, tempDir, new 
Configuration(), false);
     paimonTable = testTable.getPaimonTable();
-    testSchema =
-        InternalSchema.builder().build(); // empty schema won't matter for 
non-partitioned tables
   }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonStatsExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonStatsExtractor.java
new file mode 100644
index 00000000..a75fef0a
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonStatsExtractor.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.math.BigDecimal;
+import java.nio.file.Path;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+@Log4j2
+public class TestPaimonStatsExtractor {
+  private static final PaimonDataFileExtractor extractor = 
PaimonDataFileExtractor.getInstance();
+  private static final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+
+  @TempDir private Path tempDir;
+  private TestPaimonTable testTable;
+  private FileStoreTable paimonTable;
+
+  @Test
+  void testColumnStatsUnpartitioned() {
+    createUnpartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
+
+    List<GenericRow> rows = testTable.insertRows(10);
+
+    List<InternalDataFile> result =
+        extractor.toInternalDataFiles(
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
schema);
+
+    assertFalse(result.isEmpty());
+    InternalDataFile dataFile = result.get(0);
+    List<ColumnStat> stats = dataFile.getColumnStats();
+    assertFalse(stats.isEmpty());
+
+    // Verify "id" stats (INT)
+    int minId = rows.stream().map(r -> 
r.getInt(0)).min(Integer::compareTo).get();
+    int maxId = rows.stream().map(r -> 
r.getInt(0)).max(Integer::compareTo).get();
+    ColumnStat idStat = getColumnStat(stats, "id");
+    assertEquals(Range.vector(minId, maxId), idStat.getRange());
+    assertEquals(0, idStat.getNumNulls());
+
+    // Verify "name" stats (STRING)
+    String minName = rows.stream().map(r -> 
r.getString(1).toString()).min(String::compareTo).get();
+    String maxName = rows.stream().map(r -> 
r.getString(1).toString()).max(String::compareTo).get();
+    ColumnStat nameStat = getColumnStat(stats, "name");
+    assertEquals(Range.vector(minName, maxName), nameStat.getRange());
+    assertEquals(0, nameStat.getNumNulls());
+
+    // Verify "value" stats (DOUBLE)
+    double minValue = rows.stream().map(r -> 
r.getDouble(2)).min(Double::compareTo).get();
+    double maxValue = rows.stream().map(r -> 
r.getDouble(2)).max(Double::compareTo).get();
+    ColumnStat valueStat = getColumnStat(stats, "value");
+    assertEquals(Range.vector(minValue, maxValue), valueStat.getRange());
+    assertEquals(0, valueStat.getNumNulls());
+
+    // Verify "created_at" stats (TIMESTAMP)
+    Timestamp minCreatedAt =
+        rows.stream().map(r -> r.getTimestamp(3, 
9)).min(Timestamp::compareTo).get();
+    Timestamp maxCreatedAt =
+        rows.stream().map(r -> r.getTimestamp(3, 
9)).max(Timestamp::compareTo).get();
+    ColumnStat createdAtStat = getColumnStat(stats, "created_at");
+    assertEquals(
+        Range.vector(minCreatedAt.toMicros(), maxCreatedAt.toMicros()), 
createdAtStat.getRange());
+    assertEquals(0, createdAtStat.getNumNulls());
+
+    // Verify "updated_at" stats (TIMESTAMP)
+    Timestamp minUpdatedAt =
+        rows.stream().map(r -> r.getTimestamp(4, 
9)).min(Timestamp::compareTo).get();
+    Timestamp maxUpdatedAt =
+        rows.stream().map(r -> r.getTimestamp(4, 
9)).max(Timestamp::compareTo).get();
+    ColumnStat updatedAtStat = getColumnStat(stats, "updated_at");
+    assertEquals(
+        Range.vector(minUpdatedAt.toMicros(), maxUpdatedAt.toMicros()), 
updatedAtStat.getRange());
+    assertEquals(0, updatedAtStat.getNumNulls());
+
+    // Verify "is_active" stats (BOOLEAN)
+    ColumnStat isActiveStat = getColumnStat(stats, "is_active");
+    assertEquals(Range.vector(false, true), isActiveStat.getRange());
+    assertEquals(0, isActiveStat.getNumNulls());
+
+    // Verify "description" stats (VARCHAR(255))
+    String minDescription =
+        rows.stream().map(r -> 
r.getString(6).toString()).min(String::compareTo).get();
+    String maxDescription =
+        rows.stream().map(r -> 
r.getString(6).toString()).max(String::compareTo).get();
+    ColumnStat descriptionStat = getColumnStat(stats, "description");
+    assertEquals(Range.vector(minDescription, maxDescription), 
descriptionStat.getRange());
+    assertEquals(0, descriptionStat.getNumNulls());
+  }
+
+  @Test
+  void testColumnStatsPartitionedTable() {
+    createPartitionedTable();
+    InternalSchema schema = 
schemaExtractor.toInternalSchema(testTable.getPaimonTable().schema());
+
+    testTable.insertRows(10);
+
+    List<InternalDataFile> result =
+        extractor.toInternalDataFiles(
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
schema);
+
+    assertFalse(result.isEmpty());
+    InternalDataFile dataFile = result.get(0);
+    List<ColumnStat> stats = dataFile.getColumnStats();
+    assertFalse(stats.isEmpty());
+
+    // check that extracted stats' column orders are still the same
+    // no need to check stats range, these are covered in 
testColumnStatsUnpartitioned
+    assertEquals("id", stats.get(0).getField().getName());
+    assertEquals("name", stats.get(1).getField().getName());
+    assertEquals("value", stats.get(2).getField().getName());
+    assertEquals("created_at", stats.get(3).getField().getName());
+    assertEquals("updated_at", stats.get(4).getField().getName());
+    assertEquals("is_active", stats.get(5).getField().getName());
+    assertEquals("description", stats.get(6).getField().getName());
+    assertEquals("level", stats.get(7).getField().getName());
+
+    // check stats range for the partition column (level)
+    assertEquals(Range.scalar(GenericTable.LEVEL_VALUES.get(0)), 
stats.get(7).getRange());
+  }
+
+  @Test
+  void testTimestampPrecisionStats() {
+    Schema schema =
+        Schema.newBuilder()
+            .primaryKey("id")
+            .column("id", DataTypes.INT())
+            .column("ts_millis", DataTypes.TIMESTAMP(3))
+            .column("ts_micros", DataTypes.TIMESTAMP(6))
+            .column("ts_nanos", DataTypes.TIMESTAMP(9))
+            .option("bucket", "1")
+            .option("bucket-key", "id")
+            .option("full-compaction.delta-commits", "1")
+            .build();
+
+    FileStoreTable table =
+        ((TestPaimonTable)
+                TestPaimonTable.createTable(
+                    "ts_precision", null, tempDir, new Configuration(), false, 
schema))
+            .getPaimonTable();
+
+    Timestamp millisOne =
+        Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 0, 
123_000_000));
+    Timestamp microsOne =
+        Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 0, 
123_456_000));
+    Timestamp nanosOne =
+        Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 0, 
123_456_789));
+
+    Timestamp millisTwo =
+        Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 1, 
987_000_000));
+    Timestamp microsTwo =
+        Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 1, 
987_654_000));
+    Timestamp nanosTwo =
+        Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 1, 1, 0, 0, 1, 
987_654_321));
+
+    TestPaimonTable.writeRows(
+        table,
+        Arrays.asList(
+            GenericRow.of(1, millisOne, microsOne, nanosOne),
+            GenericRow.of(2, millisTwo, microsTwo, nanosTwo)));
+
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(table.schema());
+    List<ColumnStat> stats =
+        extractor
+            .toInternalDataFiles(table, 
table.snapshotManager().latestSnapshot(), internalSchema)
+            .get(0)
+            .getColumnStats();
+
+    Range millisRange = getColumnStat(stats, "ts_millis").getRange();
+    assertEquals(Range.vector(millisOne.getMillisecond(), 
millisTwo.getMillisecond()), millisRange);
+
+    Range microsRange = getColumnStat(stats, "ts_micros").getRange();
+    assertEquals(Range.vector(microsOne.toMicros(), microsTwo.toMicros()), 
microsRange);
+
+    Range nanosRange = getColumnStat(stats, "ts_nanos").getRange();
+    // TODO: Paimon does not fully support stats at nanos precision - this is 
null for parquet
+    // format in 1.3.1
+    assertNull(nanosRange);
+  }
+
+  @Test
+  void testDecimalDateAndNullStatsWithLongStrings() {
+    Schema schema =
+        Schema.newBuilder()
+            .primaryKey("id")
+            .column("id", DataTypes.INT())
+            .column("price", DataTypes.DECIMAL(10, 2))
+            .column("event_date", DataTypes.DATE())
+            .column("notes", DataTypes.STRING())
+            .option("bucket", "1")
+            .option("bucket-key", "id")
+            .option("full-compaction.delta-commits", "1")
+            .option("metadata.stats-mode", "truncate(16)")
+            .build();
+
+    FileStoreTable table =
+        ((TestPaimonTable)
+                TestPaimonTable.createTable(
+                    "decimal_date", null, tempDir, new Configuration(), false, 
schema))
+            .getPaimonTable();
+
+    String longA = repeatChar('a', 32);
+    String longB = repeatChar('b', 32);
+    String longC = repeatChar('c', 32);
+    int jan1 = (int) LocalDate.of(2024, 1, 1).toEpochDay();
+    int jan2 = (int) LocalDate.of(2024, 1, 2).toEpochDay();
+
+    GenericRow row1 =
+        GenericRow.of(
+            1,
+            Decimal.fromBigDecimal(new BigDecimal("12.34"), 10, 2),
+            jan1,
+            BinaryString.fromString(longA));
+    GenericRow row2 = GenericRow.of(2, null, jan2, 
BinaryString.fromString(longB));
+    GenericRow row3 =
+        GenericRow.of(
+            3,
+            Decimal.fromBigDecimal(new BigDecimal("-5.50"), 10, 2),
+            null,
+            BinaryString.fromString(longC));
+
+    TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(table.schema());
+    List<ColumnStat> stats =
+        extractor
+            .toInternalDataFiles(table, 
table.snapshotManager().latestSnapshot(), internalSchema)
+            .get(0)
+            .getColumnStats();
+
+    ColumnStat priceStat = getColumnStat(stats, "price");
+    assertEquals(
+        Range.vector(new BigDecimal("-5.50"), new BigDecimal("12.34")), 
priceStat.getRange());
+    assertEquals(1, priceStat.getNumNulls());
+    assertEquals(3, priceStat.getNumValues());
+
+    ColumnStat dateStat = getColumnStat(stats, "event_date");
+    assertEquals(Range.vector(jan1, jan2), dateStat.getRange());
+    assertEquals(1, dateStat.getNumNulls());
+    assertEquals(3, dateStat.getNumValues());
+
+    ColumnStat notesStat = getColumnStat(stats, "notes");
+    String minNotes = (String) notesStat.getRange().getMinValue();
+    String maxNotes = (String) notesStat.getRange().getMaxValue();
+    assertEquals(repeatChar('a', 16), minNotes);
+    assertEquals(repeatChar('c', 15) + 'd', maxNotes);
+    assertEquals(16, minNotes.length());
+    assertEquals(16, maxNotes.length());
+    assertEquals(0, notesStat.getNumNulls());
+    assertEquals(3, notesStat.getNumValues());
+  }
+
+  @Test
+  void testFieldLevelStats() {
+    Schema schema =
+        Schema.newBuilder()
+            .primaryKey("id")
+            .column("id", DataTypes.INT())
+            .column("foo", DataTypes.STRING())
+            .column("bar", DataTypes.STRING())
+            .column("boo", DataTypes.STRING())
+            .option("bucket", "1")
+            .option("bucket-key", "id")
+            .option("full-compaction.delta-commits", "1")
+            .option("metadata.stats-mode", "none")
+            .option("fields.id.stats-mode", "truncate(16)")
+            .option("fields.foo.stats-mode", "truncate(16)")
+            .build();
+
+    FileStoreTable table =
+        ((TestPaimonTable)
+                TestPaimonTable.createTable(
+                    "field_level_stats", null, tempDir, new Configuration(), 
false, schema))
+            .getPaimonTable();
+
+    GenericRow row1 =
+        GenericRow.of(
+            1,
+            BinaryString.fromString("foo1"),
+            BinaryString.fromString("bar1"),
+            BinaryString.fromString("boo1"));
+    GenericRow row2 =
+        GenericRow.of(
+            2,
+            BinaryString.fromString("foo2"),
+            BinaryString.fromString("bar2"),
+            BinaryString.fromString("boo2"));
+    GenericRow row3 =
+        GenericRow.of(
+            3,
+            BinaryString.fromString("foo3"),
+            BinaryString.fromString("bar3"),
+            BinaryString.fromString("boo3"));
+
+    TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(table.schema());
+    List<ColumnStat> stats =
+        extractor
+            .toInternalDataFiles(table, 
table.snapshotManager().latestSnapshot(), internalSchema)
+            .get(0)
+            .getColumnStats();
+
+    ColumnStat idStat = getColumnStat(stats, "id");
+    assertEquals(Range.vector(1, 3), idStat.getRange());
+  }
+
+  @Test
+  void testPaimonNoStats() {
+    Schema schema =
+        Schema.newBuilder()
+            .primaryKey("id")
+            .column("id", DataTypes.INT())
+            .column("foo", DataTypes.STRING())
+            .column("bar", DataTypes.STRING())
+            .column("boo", DataTypes.STRING())
+            .option("bucket", "1")
+            .option("bucket-key", "id")
+            .option("full-compaction.delta-commits", "1")
+            .option("metadata.stats-mode", "none")
+            .build();
+
+    FileStoreTable table =
+        ((TestPaimonTable)
+                TestPaimonTable.createTable(
+                    "field_level_stats", null, tempDir, new Configuration(), 
false, schema))
+            .getPaimonTable();
+
+    GenericRow row1 =
+        GenericRow.of(
+            1,
+            BinaryString.fromString("foo1"),
+            BinaryString.fromString("bar1"),
+            BinaryString.fromString("boo1"));
+    GenericRow row2 =
+        GenericRow.of(
+            2,
+            BinaryString.fromString("foo2"),
+            BinaryString.fromString("bar2"),
+            BinaryString.fromString("boo2"));
+    GenericRow row3 =
+        GenericRow.of(
+            3,
+            BinaryString.fromString("foo3"),
+            BinaryString.fromString("bar3"),
+            BinaryString.fromString("boo3"));
+
+    TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(table.schema());
+    List<ColumnStat> stats =
+        extractor
+            .toInternalDataFiles(table, 
table.snapshotManager().latestSnapshot(), internalSchema)
+            .get(0)
+            .getColumnStats();
+
+    assertEquals(0, stats.size());
+  }
+
+  @Test
+  void testPaimonDropStats() {
+    Schema schema =
+        Schema.newBuilder()
+            .primaryKey("id")
+            .column("id", DataTypes.INT())
+            .column("foo", DataTypes.STRING())
+            .column("bar", DataTypes.STRING())
+            .column("boo", DataTypes.STRING())
+            .option("bucket", "1")
+            .option("bucket-key", "id")
+            .option("full-compaction.delta-commits", "1")
+            .option("metadata.stats-mode", "full")
+            .option("manifest.delete-file-drop-stats", "true")
+            .build();
+
+    FileStoreTable table =
+        ((TestPaimonTable)
+                TestPaimonTable.createTable(
+                    "field_level_stats", null, tempDir, new Configuration(), 
false, schema))
+            .getPaimonTable();
+
+    GenericRow row1 =
+        GenericRow.of(
+            1,
+            BinaryString.fromString("foo1"),
+            BinaryString.fromString("bar1"),
+            BinaryString.fromString("boo1"));
+    GenericRow row2 =
+        GenericRow.of(
+            2,
+            BinaryString.fromString("foo2"),
+            BinaryString.fromString("bar2"),
+            BinaryString.fromString("boo2"));
+    GenericRow row3 =
+        GenericRow.of(
+            3,
+            BinaryString.fromString("foo3"),
+            BinaryString.fromString("bar3"),
+            BinaryString.fromString("boo3"));
+
+    TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(table.schema());
+    List<ColumnStat> stats =
+        extractor
+            .toInternalDataFiles(table, 
table.snapshotManager().latestSnapshot(), internalSchema)
+            .get(0)
+            .getColumnStats();
+
+    // compaction create commits that are DELETE and ADD on the same file
+    // with `manifest.delete-file-drop-stats` enabled, this means stats are 
empty after compaction
+    // this is a smoke test to ensure exceptions aren't raised for this 
scenario
+    // See also: https://github.com/apache/paimon/issues/7026
+    assertEquals(0, stats.size());
+  }
+
+  @Test
+  void testComplexFieldStats() {
+    // Paimon does not collect stats on nested fields or array fields
+    Schema schema =
+        Schema.newBuilder()
+            .primaryKey("id")
+            .column("id", DataTypes.INT())
+            .column(
+                "nested",
+                DataTypes.ROW(
+                    DataTypes.FIELD(1, "f1", DataTypes.STRING()),
+                    DataTypes.FIELD(2, "f2", DataTypes.INT())))
+            .column("array", DataTypes.ARRAY(DataTypes.INT()))
+            .option("bucket", "1")
+            .option("bucket-key", "id")
+            .option("full-compaction.delta-commits", "1")
+            .option("metadata.stats-mode", "full")
+            .build();
+
+    FileStoreTable table =
+        ((TestPaimonTable)
+                TestPaimonTable.createTable(
+                    "nested_field_stats", null, tempDir, new Configuration(), 
false, schema))
+            .getPaimonTable();
+
+    GenericRow row1 =
+        GenericRow.of(
+            1, GenericRow.of(BinaryString.fromString("a"), 10), new 
GenericArray(new int[] {1, 2}));
+    GenericRow row2 =
+        GenericRow.of(
+            2, GenericRow.of(BinaryString.fromString("b"), 20), new 
GenericArray(new int[] {3, 4}));
+    GenericRow row3 =
+        GenericRow.of(
+            3, GenericRow.of(BinaryString.fromString("c"), 30), new 
GenericArray(new int[] {}));
+
+    TestPaimonTable.writeRows(table, Arrays.asList(row1, row2, row3));
+
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(table.schema());
+    List<ColumnStat> stats =
+        extractor
+            .toInternalDataFiles(table, 
table.snapshotManager().latestSnapshot(), internalSchema)
+            .get(0)
+            .getColumnStats();
+
+    // only the id column has stats, nested fields and array fields do not 
have stats
+    assertEquals(1, stats.size());
+    ColumnStat idStat = getColumnStat(stats, "id");
+    assertEquals(Range.vector(1, 3), idStat.getRange());
+    assertEquals(0, idStat.getNumNulls());
+    assertEquals(3, idStat.getNumValues());
+  }
+
+  private void createUnpartitionedTable() {
+    testTable =
+        (TestPaimonTable)
+            TestPaimonTable.createTable("test_table", null, tempDir, new 
Configuration(), false);
+    paimonTable = testTable.getPaimonTable();
+  }
+
+  private void createPartitionedTable() {
+    testTable =
+        (TestPaimonTable)
+            TestPaimonTable.createTable("test_table", "level", tempDir, new 
Configuration(), false);
+    paimonTable = testTable.getPaimonTable();
+  }
+
+  private ColumnStat getColumnStat(List<ColumnStat> stats, String columnName) {
+    return stats.stream()
+        .filter(stat -> stat.getField().getName().equals(columnName))
+        .findFirst()
+        .orElseThrow(
+            () -> new IllegalArgumentException("Column stat not found for 
column: " + columnName));
+  }
+
+  private String repeatChar(char ch, int count) {
+    char[] chars = new char[count];
+    Arrays.fill(chars, ch);
+    return new String(chars);
+  }
+}
diff --git 
a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml 
b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
index e9217a33..e646507b 100644
--- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
+++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
@@ -38,4 +38,6 @@ tableFormatConverters:
         spark.app.name: xtable
     ICEBERG:
       conversionSourceProviderClass: 
org.apache.xtable.iceberg.IcebergConversionSourceProvider
-      conversionTargetProviderClass: 
org.apache.xtable.iceberg.IcebergConversionTarget
\ No newline at end of file
+      conversionTargetProviderClass: 
org.apache.xtable.iceberg.IcebergConversionTarget
+    PAIMON:
+      conversionSourceProviderClass: 
org.apache.xtable.paimon.PaimonConversionSourceProvider
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
index 273854a0..a5967442 100644
--- 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
+++ 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
@@ -21,6 +21,7 @@ package org.apache.xtable.utilities;
 import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
 
 import java.io.IOException;
 import java.net.URL;
@@ -104,10 +105,11 @@ class TestRunSync {
   public void testTableFormatConverterConfigDefault() throws IOException {
     TableFormatConverters converters = 
RunSync.loadTableFormatConversionConfigs(null);
     Map<String, ConversionConfig> tfConverters = 
converters.getTableFormatConverters();
-    Assertions.assertEquals(3, tfConverters.size());
+    Assertions.assertEquals(4, tfConverters.size());
     Assertions.assertNotNull(tfConverters.get(DELTA));
     Assertions.assertNotNull(tfConverters.get(HUDI));
     Assertions.assertNotNull(tfConverters.get(ICEBERG));
+    Assertions.assertNotNull(tfConverters.get(PAIMON));
 
     Assertions.assertEquals(
         "org.apache.xtable.hudi.HudiConversionSourceProvider",
@@ -127,6 +129,9 @@ class TestRunSync {
     Assertions.assertEquals(
         "org.apache.xtable.delta.DeltaConversionSourceProvider",
         tfConverters.get(DELTA).getConversionSourceProviderClass());
+    Assertions.assertEquals(
+        "org.apache.xtable.paimon.PaimonConversionSourceProvider",
+        tfConverters.get(PAIMON).getConversionSourceProviderClass());
   }
 
   @Test
@@ -144,7 +149,7 @@ class TestRunSync {
     TableFormatConverters converters =
         RunSync.loadTableFormatConversionConfigs(customConverters.getBytes());
     Map<String, ConversionConfig> tfConverters = 
converters.getTableFormatConverters();
-    Assertions.assertEquals(4, tfConverters.size());
+    Assertions.assertEquals(5, tfConverters.size());
 
     Assertions.assertNotNull(tfConverters.get("NEW_FORMAT"));
     Assertions.assertEquals(

Reply via email to