the-other-tim-brown commented on code in PR #742:
URL: https://github.com/apache/incubator-xtable/pull/742#discussion_r2483771095


##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static 
org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Log4j2
+public class PaimonConversionSource implements ConversionSource<Snapshot> {
+
+  private final FileStoreTable paimonTable;
+  private final SchemaManager schemaManager;
+  private final SnapshotManager snapshotManager;
+
+  private final PaimonDataFileExtractor dataFileExtractor = 
PaimonDataFileExtractor.getInstance();
+  private final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+  private final PaimonPartitionExtractor partitionSpecExtractor =
+      PaimonPartitionExtractor.getInstance();
+
+  public PaimonConversionSource(FileStoreTable paimonTable) {
+    this.paimonTable = paimonTable;
+    this.schemaManager = paimonTable.schemaManager();
+    this.snapshotManager = paimonTable.snapshotManager();
+  }
+
+  @Override
+  public InternalTable getTable(Snapshot snapshot) {
+    TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+
+    List<String> partitionKeys = paimonTable.partitionKeys();
+    List<InternalPartitionField> partitioningFields =
+        partitionSpecExtractor.toInternalPartitionFields(partitionKeys, 
internalSchema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : 
HIVE_STYLE_PARTITION;
+
+    return InternalTable.builder()
+        .name(paimonTable.name())
+        .tableFormat(TableFormat.PAIMON)
+        .readSchema(internalSchema)
+        .layoutStrategy(dataLayoutStrategy)
+        .basePath(paimonTable.location().toString())
+        .partitioningFields(partitioningFields)
+        .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
+        
.latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
+        .build();
+  }
+
+  @Override
+  public InternalTable getCurrentTable() {
+    SnapshotManager snapshotManager = paimonTable.snapshotManager();
+    Snapshot snapshot = snapshotManager.latestSnapshot();
+    if (snapshot == null) {
+      throw new ReadException("No snapshots found for table " + 
paimonTable.name());
+    }

Review Comment:
   Nitpick: can we move this code and lines 99-103 into a little private, 
helper method 



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static 
org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Log4j2
+public class PaimonConversionSource implements ConversionSource<Snapshot> {
+
+  private final FileStoreTable paimonTable;
+  private final SchemaManager schemaManager;
+  private final SnapshotManager snapshotManager;
+
+  private final PaimonDataFileExtractor dataFileExtractor = 
PaimonDataFileExtractor.getInstance();
+  private final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+  private final PaimonPartitionExtractor partitionSpecExtractor =
+      PaimonPartitionExtractor.getInstance();
+
+  public PaimonConversionSource(FileStoreTable paimonTable) {
+    this.paimonTable = paimonTable;
+    this.schemaManager = paimonTable.schemaManager();
+    this.snapshotManager = paimonTable.snapshotManager();
+  }
+
+  @Override
+  public InternalTable getTable(Snapshot snapshot) {
+    TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+
+    List<String> partitionKeys = paimonTable.partitionKeys();
+    List<InternalPartitionField> partitioningFields =
+        partitionSpecExtractor.toInternalPartitionFields(partitionKeys, 
internalSchema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : 
HIVE_STYLE_PARTITION;

Review Comment:
   Does Paimon always include the field name in the path or is there an option 
to not do this?
   
   For example, Hive style means date would be `base/path/data=2025-11-01/data` 
and not simply `base/path/2025-11-01/data`



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.*;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class PaimonDataFileExtractor {
+
+  private final PaimonPartitionExtractor partitionExtractor =
+      PaimonPartitionExtractor.getInstance();
+
+  private static final PaimonDataFileExtractor INSTANCE = new 
PaimonDataFileExtractor();
+
+  public static PaimonDataFileExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalDataFile> toInternalDataFiles(FileStoreTable table, 
Snapshot snapshot) {
+    List<InternalDataFile> result = new ArrayList<>();
+    Iterator<ManifestEntry> manifestEntryIterator =
+        newSnapshotReader(table, snapshot).readFileIterator();
+    while (manifestEntryIterator.hasNext()) {
+      result.add(toInternalDataFile(table, manifestEntryIterator.next()));
+    }
+    return result;
+  }
+
+  private InternalDataFile toInternalDataFile(FileStoreTable table, 
ManifestEntry entry) {
+    return InternalDataFile.builder()
+        .physicalPath(toFullPhysicalPath(table, entry))
+        .fileSizeBytes(entry.file().fileSize())
+        .lastModified(entry.file().creationTimeEpochMillis())
+        .recordCount(entry.file().rowCount())
+        .partitionValues(partitionExtractor.toPartitionValues(table, 
entry.partition()))
+        .columnStats(toColumnStats(entry.file()))
+        .build();
+  }
+
+  private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) 
{
+    String basePath = table.location().toString();
+    String bucketPath = "bucket-" + entry.bucket();
+    String filePath = entry.file().fileName();
+
+    Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, 
entry.partition());
+    if (partitionPath.isPresent()) {
+      return String.join("/", basePath, partitionPath.get(), bucketPath, 
filePath);
+    } else {
+      return String.join("/", basePath, bucketPath, filePath);
+    }
+  }
+
+  private List<ColumnStat> toColumnStats(DataFileMeta file) {
+    // TODO: Implement logic to extract column stats from the file meta

Review Comment:
   Let's track this as a separate GH issue if we are not already. 



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.io.IOException;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+public class PaimonConversionSourceProvider extends 
ConversionSourceProvider<Snapshot> {
+  @Override
+  public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable 
sourceTableConfig) {
+    try {
+      Options catalogOptions = new Options();
+      CatalogContext context = CatalogContext.create(catalogOptions, 
hadoopConf);
+
+      Path path = new Path(sourceTableConfig.getDataPath());
+      FileIO fileIO = FileIO.get(path, context);
+      FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path);
+
+      return new PaimonConversionSource(paimonTable);
+    } catch (IOException e) {
+      throw new ReadException(e.getMessage());

Review Comment:
   Let's update this handling so the stacktrace is propagated.
   
   `throw new ReadException("Failed to read Paimon table from file system", e)`



##########
xtable-core/src/test/java/org/apache/xtable/ITConversionController.java:
##########
@@ -104,9 +106,12 @@
 import org.apache.xtable.iceberg.TestIcebergDataHelper;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.paimon.PaimonConversionSourceProvider;
 
 public class ITConversionController {
-  @TempDir public static Path tempDir;
+  @TempDir(cleanup = CleanupMode.NEVER) // TODO remove CleanupMode.NEVER after 
debugging

Review Comment:
   Can this be reverted now?



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.*;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class PaimonDataFileExtractor {
+
+  private final PaimonPartitionExtractor partitionExtractor =
+      PaimonPartitionExtractor.getInstance();
+
+  private static final PaimonDataFileExtractor INSTANCE = new 
PaimonDataFileExtractor();
+
+  public static PaimonDataFileExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalDataFile> toInternalDataFiles(FileStoreTable table, 
Snapshot snapshot) {
+    List<InternalDataFile> result = new ArrayList<>();
+    Iterator<ManifestEntry> manifestEntryIterator =
+        newSnapshotReader(table, snapshot).readFileIterator();
+    while (manifestEntryIterator.hasNext()) {
+      result.add(toInternalDataFile(table, manifestEntryIterator.next()));
+    }
+    return result;
+  }
+
+  private InternalDataFile toInternalDataFile(FileStoreTable table, 
ManifestEntry entry) {
+    return InternalDataFile.builder()
+        .physicalPath(toFullPhysicalPath(table, entry))
+        .fileSizeBytes(entry.file().fileSize())
+        .lastModified(entry.file().creationTimeEpochMillis())
+        .recordCount(entry.file().rowCount())
+        .partitionValues(partitionExtractor.toPartitionValues(table, 
entry.partition()))
+        .columnStats(toColumnStats(entry.file()))
+        .build();
+  }
+
+  private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) 
{
+    String basePath = table.location().toString();
+    String bucketPath = "bucket-" + entry.bucket();
+    String filePath = entry.file().fileName();
+
+    Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, 
entry.partition());
+    if (partitionPath.isPresent()) {
+      return String.join("/", basePath, partitionPath.get(), bucketPath, 
filePath);
+    } else {
+      return String.join("/", basePath, bucketPath, filePath);
+    }
+  }
+
+  private List<ColumnStat> toColumnStats(DataFileMeta file) {
+    // TODO: Implement logic to extract column stats from the file meta
+    return Collections.emptyList();
+  }
+
+  private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot 
snapshot) {
+    // If the table has primary keys, we read only the top level files

Review Comment:
   Just curious, is this similar to the Hudi Merge on Read table?



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.io.IOException;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+public class PaimonConversionSourceProvider extends 
ConversionSourceProvider<Snapshot> {
+  @Override
+  public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable 
sourceTableConfig) {
+    try {
+      Options catalogOptions = new Options();

Review Comment:
   We don't need any changes right now but will the user want to supply some 
custom options here?



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static 
org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Log4j2
+public class PaimonConversionSource implements ConversionSource<Snapshot> {
+
+  private final FileStoreTable paimonTable;
+  private final SchemaManager schemaManager;
+  private final SnapshotManager snapshotManager;
+
+  private final PaimonDataFileExtractor dataFileExtractor = 
PaimonDataFileExtractor.getInstance();
+  private final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+  private final PaimonPartitionExtractor partitionSpecExtractor =
+      PaimonPartitionExtractor.getInstance();
+
+  public PaimonConversionSource(FileStoreTable paimonTable) {
+    this.paimonTable = paimonTable;
+    this.schemaManager = paimonTable.schemaManager();
+    this.snapshotManager = paimonTable.snapshotManager();
+  }
+
+  @Override
+  public InternalTable getTable(Snapshot snapshot) {
+    TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+
+    List<String> partitionKeys = paimonTable.partitionKeys();
+    List<InternalPartitionField> partitioningFields =
+        partitionSpecExtractor.toInternalPartitionFields(partitionKeys, 
internalSchema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : 
HIVE_STYLE_PARTITION;
+
+    return InternalTable.builder()
+        .name(paimonTable.name())
+        .tableFormat(TableFormat.PAIMON)
+        .readSchema(internalSchema)
+        .layoutStrategy(dataLayoutStrategy)
+        .basePath(paimonTable.location().toString())
+        .partitioningFields(partitioningFields)
+        .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
+        
.latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
+        .build();
+  }
+
+  @Override
+  public InternalTable getCurrentTable() {
+    SnapshotManager snapshotManager = paimonTable.snapshotManager();
+    Snapshot snapshot = snapshotManager.latestSnapshot();
+    if (snapshot == null) {
+      throw new ReadException("No snapshots found for table " + 
paimonTable.name());
+    }
+    return getTable(snapshot);
+  }
+
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+    SnapshotManager snapshotManager = paimonTable.snapshotManager();
+    Snapshot snapshot = snapshotManager.latestSnapshot();
+    if (snapshot == null) {
+      throw new ReadException("No snapshots found for table " + 
paimonTable.name());
+    }
+
+    InternalTable internalTable = getTable(snapshot);
+    List<InternalDataFile> internalDataFiles =
+        dataFileExtractor.toInternalDataFiles(paimonTable, snapshot);
+
+    return InternalSnapshot.builder()
+        .table(internalTable)
+        .version(Long.toString(snapshot.timeMillis()))
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+        // TODO : Implement pending commits extraction, required for 
incremental sync

Review Comment:
   Nitpick: Can you create a GH Issue to track the incremental sync work if one 
does not already exist?



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Extracts partition spec for Paimon as identity transforms on partition 
keys. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonPartitionExtractor {
+
+  private final PaimonSchemaExtractor paimonSchemaExtractor = 
PaimonSchemaExtractor.getInstance();
+
+  private static final PaimonPartitionExtractor INSTANCE = new 
PaimonPartitionExtractor();
+
+  public static PaimonPartitionExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalPartitionField> toInternalPartitionFields(
+      List<String> partitionKeys, InternalSchema schema) {
+    if (partitionKeys == null || partitionKeys.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return partitionKeys.stream()
+        .map(key -> toPartitionField(key, schema))
+        .collect(Collectors.toList());
+  }
+
+  public List<PartitionValue> toPartitionValues(FileStoreTable table, 
BinaryRow partition) {
+    InternalRowPartitionComputer partitionComputer = 
newPartitionComputer(table);
+    InternalSchema internalSchema = 
paimonSchemaExtractor.toInternalSchema(table.schema());

Review Comment:
   Can we pass in the schema to avoid this conversion per file?



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Extracts partition spec for Paimon as identity transforms on partition 
keys. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonPartitionExtractor {
+
+  private final PaimonSchemaExtractor paimonSchemaExtractor = 
PaimonSchemaExtractor.getInstance();
+
+  private static final PaimonPartitionExtractor INSTANCE = new 
PaimonPartitionExtractor();
+
+  public static PaimonPartitionExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalPartitionField> toInternalPartitionFields(
+      List<String> partitionKeys, InternalSchema schema) {
+    if (partitionKeys == null || partitionKeys.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return partitionKeys.stream()
+        .map(key -> toPartitionField(key, schema))
+        .collect(Collectors.toList());
+  }
+
+  public List<PartitionValue> toPartitionValues(FileStoreTable table, 
BinaryRow partition) {
+    InternalRowPartitionComputer partitionComputer = 
newPartitionComputer(table);
+    InternalSchema internalSchema = 
paimonSchemaExtractor.toInternalSchema(table.schema());
+
+    List<PartitionValue> partitionValues = new ArrayList<>();

Review Comment:
   Nitpick: let's establish the list with the size of the map produced by 
`partitionComputer.generatePartValues(partition)` - this is done per file so we 
want to make sure we're as lean as possible when running the initial snapshot 
syncs on very large tables.
   
   We can also return Collections.emptyList when the table is not partitioned.



##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Extracts partition spec for Paimon as identity transforms on partition 
keys. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonPartitionExtractor {
+
+  private final PaimonSchemaExtractor paimonSchemaExtractor = 
PaimonSchemaExtractor.getInstance();
+
+  private static final PaimonPartitionExtractor INSTANCE = new 
PaimonPartitionExtractor();
+
+  public static PaimonPartitionExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalPartitionField> toInternalPartitionFields(
+      List<String> partitionKeys, InternalSchema schema) {
+    if (partitionKeys == null || partitionKeys.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return partitionKeys.stream()
+        .map(key -> toPartitionField(key, schema))
+        .collect(Collectors.toList());
+  }
+
+  public List<PartitionValue> toPartitionValues(FileStoreTable table, 
BinaryRow partition) {
+    InternalRowPartitionComputer partitionComputer = 
newPartitionComputer(table);
+    InternalSchema internalSchema = 
paimonSchemaExtractor.toInternalSchema(table.schema());
+
+    List<PartitionValue> partitionValues = new ArrayList<>();
+    for (Map.Entry<String, String> entry :
+        partitionComputer.generatePartValues(partition).entrySet()) {
+      PartitionValue partitionValue =
+          PartitionValue.builder()
+              .partitionField(toPartitionField(entry.getKey(), internalSchema))
+              .range(Range.scalar(entry.getValue()))
+              .build();
+      partitionValues.add(partitionValue);
+    }
+    return partitionValues;
+  }
+
+  public Optional<String> toPartitionPath(FileStoreTable table, BinaryRow 
partition) {
+    InternalRowPartitionComputer partitionComputer = 
newPartitionComputer(table);
+    return partitionComputer.generatePartValues(partition).entrySet().stream()
+        .map(e -> e.getKey() + "=" + e.getValue())
+        .reduce((a, b) -> a + "/" + b);
+  }
+
+  private InternalPartitionField toPartitionField(String key, InternalSchema 
schema) {
+    InternalField sourceField =
+        findField(schema, key)
+            .orElseThrow(
+                () -> new IllegalArgumentException("Partition key not found in 
schema: " + key));
+    return InternalPartitionField.builder()
+        .sourceField(sourceField)
+        .transformType(PartitionTransformType.VALUE)

Review Comment:
   Does Paimon allow users to partition on a timestamp?



##########
xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java:
##########
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestPaimonSchemaExtractor {
+  private static final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+
+  private void assertField(DataField paimonField, InternalField 
expectedInternalField) {
+    assertField(paimonField, expectedInternalField, Collections.emptyList());
+  }
+
+  private void assertField(
+      DataField paimonField, InternalField expectedInternalField, List<String> 
primaryKeys) {
+    TableSchema paimonSchema =
+        new TableSchema(
+            0,
+            Collections.singletonList(paimonField),
+            0,
+            Collections.emptyList(),
+            primaryKeys,
+            new HashMap<>(),
+            "");
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+    List<InternalField> recordKeyFields =
+        primaryKeys.isEmpty()
+            ? Collections.emptyList()
+            : Collections.singletonList(expectedInternalField);
+    InternalSchema expectedSchema =
+        InternalSchema.builder()
+            .name("record")
+            .dataType(InternalType.RECORD)
+            .fields(Collections.singletonList(expectedInternalField))
+            .recordKeyFields(recordKeyFields)
+            .build();
+    assertEquals(expectedSchema, internalSchema);
+  }
+
+  @Test
+  void testCharField() {
+    DataField paimonField = new DataField(0, "char_field", new CharType(10));
+    InternalField expectedField =
+        InternalField.builder()
+            .name("char_field")
+            .fieldId(0)
+            .schema(
+                InternalSchema.builder()
+                    .name("CHAR(10)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testVarcharField() {
+    DataField paimonField = new DataField(1, "varchar_field", new 
VarCharType(255));
+    InternalField expectedField =
+        InternalField.builder()
+            .name("varchar_field")
+            .fieldId(1)
+            .schema(
+                InternalSchema.builder()
+                    .name("VARCHAR(255)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testBooleanField() {
+    DataField paimonField = new DataField(2, "boolean_field", new 
BooleanType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("boolean_field")
+            .fieldId(2)
+            .schema(
+                InternalSchema.builder()
+                    .name("BOOLEAN")
+                    .dataType(InternalType.BOOLEAN)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testTinyIntField() {
+    DataField paimonField = new DataField(3, "tinyint_field", new 
TinyIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("tinyint_field")
+            .fieldId(3)
+            .schema(
+                InternalSchema.builder()
+                    .name("TINYINT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testSmallIntField() {
+    DataField paimonField = new DataField(4, "smallint_field", new 
SmallIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("smallint_field")
+            .fieldId(4)
+            .schema(
+                InternalSchema.builder()
+                    .name("SMALLINT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testIntField() {
+    DataField paimonField = new DataField(5, "int_field", new IntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("int_field")
+            .fieldId(5)
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testBigIntField() {
+    DataField paimonField = new DataField(6, "bigint_field", new BigIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("bigint_field")
+            .fieldId(6)
+            .schema(
+                InternalSchema.builder()
+                    .name("BIGINT")
+                    .dataType(InternalType.LONG)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testFloatField() {
+    DataField paimonField = new DataField(7, "float_field", new FloatType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("float_field")
+            .fieldId(7)
+            .schema(
+                InternalSchema.builder()
+                    .name("FLOAT")
+                    .dataType(InternalType.FLOAT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testDoubleField() {
+    DataField paimonField = new DataField(8, "double_field", new DoubleType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("double_field")
+            .fieldId(8)
+            .schema(
+                InternalSchema.builder()
+                    .name("DOUBLE")
+                    .dataType(InternalType.DOUBLE)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testDateField() {
+    DataField paimonField = new DataField(9, "date_field", new DateType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("date_field")
+            .fieldId(9)
+            .schema(
+                InternalSchema.builder()
+                    .name("DATE")
+                    .dataType(InternalType.DATE)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testTimestampField() {
+    DataField paimonField = new DataField(10, "timestamp_field", new 
TimestampType(3));
+    Map<InternalSchema.MetadataKey, Object> timestampMetadata =
+        Collections.singletonMap(
+            InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
+    InternalField expectedField =
+        InternalField.builder()
+            .name("timestamp_field")
+            .fieldId(10)
+            .schema(
+                InternalSchema.builder()
+                    .name("TIMESTAMP(3)")
+                    .dataType(InternalType.TIMESTAMP)
+                    .isNullable(true)
+                    .metadata(timestampMetadata)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testDecimalField() {
+    DataField paimonField = new DataField(11, "decimal_field", new 
DecimalType(10, 2));
+    Map<InternalSchema.MetadataKey, Object> decimalMetadata = new HashMap<>();
+    decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10);
+    decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2);
+    InternalField expectedField =
+        InternalField.builder()
+            .name("decimal_field")
+            .fieldId(11)
+            .schema(
+                InternalSchema.builder()
+                    .name("DECIMAL(10, 2)")
+                    .dataType(InternalType.DECIMAL)
+                    .isNullable(true)
+                    .metadata(decimalMetadata)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testStructField() {

Review Comment:
   Nitpick: for my own sanity, I usually like to go three levels deep to ensure 
that the parent path is not simply the name of the parent field but rather the 
full path to that field. Can we do that here?



##########
xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java:
##########
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestPaimonSchemaExtractor {
+  private static final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+
+  private void assertField(DataField paimonField, InternalField 
expectedInternalField) {
+    assertField(paimonField, expectedInternalField, Collections.emptyList());
+  }
+
+  private void assertField(
+      DataField paimonField, InternalField expectedInternalField, List<String> 
primaryKeys) {
+    TableSchema paimonSchema =
+        new TableSchema(
+            0,
+            Collections.singletonList(paimonField),
+            0,
+            Collections.emptyList(),
+            primaryKeys,
+            new HashMap<>(),
+            "");
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+    List<InternalField> recordKeyFields =
+        primaryKeys.isEmpty()
+            ? Collections.emptyList()
+            : Collections.singletonList(expectedInternalField);
+    InternalSchema expectedSchema =
+        InternalSchema.builder()
+            .name("record")
+            .dataType(InternalType.RECORD)
+            .fields(Collections.singletonList(expectedInternalField))
+            .recordKeyFields(recordKeyFields)
+            .build();
+    assertEquals(expectedSchema, internalSchema);
+  }
+
+  @Test
+  void testCharField() {
+    DataField paimonField = new DataField(0, "char_field", new CharType(10));
+    InternalField expectedField =
+        InternalField.builder()
+            .name("char_field")
+            .fieldId(0)
+            .schema(
+                InternalSchema.builder()
+                    .name("CHAR(10)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testVarcharField() {
+    DataField paimonField = new DataField(1, "varchar_field", new 
VarCharType(255));
+    InternalField expectedField =
+        InternalField.builder()
+            .name("varchar_field")
+            .fieldId(1)
+            .schema(
+                InternalSchema.builder()
+                    .name("VARCHAR(255)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testBooleanField() {
+    DataField paimonField = new DataField(2, "boolean_field", new 
BooleanType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("boolean_field")
+            .fieldId(2)
+            .schema(
+                InternalSchema.builder()
+                    .name("BOOLEAN")
+                    .dataType(InternalType.BOOLEAN)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testTinyIntField() {
+    DataField paimonField = new DataField(3, "tinyint_field", new 
TinyIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("tinyint_field")
+            .fieldId(3)
+            .schema(
+                InternalSchema.builder()
+                    .name("TINYINT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testSmallIntField() {
+    DataField paimonField = new DataField(4, "smallint_field", new 
SmallIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("smallint_field")
+            .fieldId(4)
+            .schema(
+                InternalSchema.builder()
+                    .name("SMALLINT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testIntField() {
+    DataField paimonField = new DataField(5, "int_field", new IntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("int_field")
+            .fieldId(5)
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testBigIntField() {
+    DataField paimonField = new DataField(6, "bigint_field", new BigIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("bigint_field")
+            .fieldId(6)
+            .schema(
+                InternalSchema.builder()
+                    .name("BIGINT")
+                    .dataType(InternalType.LONG)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testFloatField() {
+    DataField paimonField = new DataField(7, "float_field", new FloatType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("float_field")
+            .fieldId(7)
+            .schema(
+                InternalSchema.builder()
+                    .name("FLOAT")
+                    .dataType(InternalType.FLOAT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testDoubleField() {
+    DataField paimonField = new DataField(8, "double_field", new DoubleType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("double_field")
+            .fieldId(8)
+            .schema(
+                InternalSchema.builder()
+                    .name("DOUBLE")
+                    .dataType(InternalType.DOUBLE)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testDateField() {
+    DataField paimonField = new DataField(9, "date_field", new DateType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("date_field")
+            .fieldId(9)
+            .schema(
+                InternalSchema.builder()
+                    .name("DATE")
+                    .dataType(InternalType.DATE)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testTimestampField() {
+    DataField paimonField = new DataField(10, "timestamp_field", new 
TimestampType(3));

Review Comment:
   Can you add a test with millis precision as well if that is supported in 
Paimon?



##########
xtable-service/pom.xml:
##########
@@ -216,6 +216,17 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- Paimon dependencies -->
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-spark-${spark.version.prefix}</artifactId>
+            <version>${paimon.version}</version>

Review Comment:
   We typically define all the versions (even for test dependences) in the 
parent pom. Can you move this version declaration to the dependency management 
section there?
   
   



##########
xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java:
##########
@@ -97,6 +98,15 @@ public static void setupOnce() {
           .sparkContext()
           .hadoopConfiguration()
           .set("parquet.avro.write-old-list-structure", "false");
+      sparkSession
+          .sparkContext()
+          .conf()
+          .set(

Review Comment:
   Can these be set directly on the sparkConf on line 94?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to