the-other-tim-brown commented on code in PR #742: URL: https://github.com/apache/incubator-xtable/pull/742#discussion_r2483771095
########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import lombok.extern.log4j.Log4j2; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Log4j2 +public class PaimonConversionSource implements ConversionSource<Snapshot> { + + private final FileStoreTable paimonTable; + private final SchemaManager schemaManager; + private final SnapshotManager snapshotManager; + + private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance(); + private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance(); + private final PaimonPartitionExtractor partitionSpecExtractor = + PaimonPartitionExtractor.getInstance(); + + public PaimonConversionSource(FileStoreTable paimonTable) { + this.paimonTable = paimonTable; + this.schemaManager = paimonTable.schemaManager(); + this.snapshotManager = paimonTable.snapshotManager(); + } + + @Override + public InternalTable getTable(Snapshot snapshot) { + TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId()); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + + List<String> partitionKeys = paimonTable.partitionKeys(); + List<InternalPartitionField> partitioningFields = + partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema); + + DataLayoutStrategy dataLayoutStrategy = + partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : HIVE_STYLE_PARTITION; + + return InternalTable.builder() + .name(paimonTable.name()) + .tableFormat(TableFormat.PAIMON) + .readSchema(internalSchema) + .layoutStrategy(dataLayoutStrategy) + .basePath(paimonTable.location().toString()) + .partitioningFields(partitioningFields) + .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis())) + .latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString()) + .build(); + } + + @Override + public InternalTable getCurrentTable() { + SnapshotManager snapshotManager = paimonTable.snapshotManager(); + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + throw new ReadException("No snapshots found for table " + paimonTable.name()); + } Review Comment: Nitpick: can we move this code and lines 99-103 into a little private, helper method ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import lombok.extern.log4j.Log4j2; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Log4j2 +public class PaimonConversionSource implements ConversionSource<Snapshot> { + + private final FileStoreTable paimonTable; + private final SchemaManager schemaManager; + private final SnapshotManager snapshotManager; + + private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance(); + private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance(); + private final PaimonPartitionExtractor partitionSpecExtractor = + PaimonPartitionExtractor.getInstance(); + + public PaimonConversionSource(FileStoreTable paimonTable) { + this.paimonTable = paimonTable; + this.schemaManager = paimonTable.schemaManager(); + this.snapshotManager = paimonTable.snapshotManager(); + } + + @Override + public InternalTable getTable(Snapshot snapshot) { + TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId()); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + + List<String> partitionKeys = paimonTable.partitionKeys(); + List<InternalPartitionField> partitioningFields = + partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema); + + DataLayoutStrategy dataLayoutStrategy = + partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : HIVE_STYLE_PARTITION; Review Comment: Does Paimon always include the field name in the path or is there an option to not do this? For example, Hive style means date would be `base/path/data=2025-11-01/data` and not simply `base/path/2025-11-01/data` ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.*; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.storage.InternalDataFile; + +public class PaimonDataFileExtractor { + + private final PaimonPartitionExtractor partitionExtractor = + PaimonPartitionExtractor.getInstance(); + + private static final PaimonDataFileExtractor INSTANCE = new PaimonDataFileExtractor(); + + public static PaimonDataFileExtractor getInstance() { + return INSTANCE; + } + + public List<InternalDataFile> toInternalDataFiles(FileStoreTable table, Snapshot snapshot) { + List<InternalDataFile> result = new ArrayList<>(); + Iterator<ManifestEntry> manifestEntryIterator = + newSnapshotReader(table, snapshot).readFileIterator(); + while (manifestEntryIterator.hasNext()) { + result.add(toInternalDataFile(table, manifestEntryIterator.next())); + } + return result; + } + + private InternalDataFile toInternalDataFile(FileStoreTable table, ManifestEntry entry) { + return InternalDataFile.builder() + .physicalPath(toFullPhysicalPath(table, entry)) + .fileSizeBytes(entry.file().fileSize()) + .lastModified(entry.file().creationTimeEpochMillis()) + .recordCount(entry.file().rowCount()) + .partitionValues(partitionExtractor.toPartitionValues(table, entry.partition())) + .columnStats(toColumnStats(entry.file())) + .build(); + } + + private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) { + String basePath = table.location().toString(); + String bucketPath = "bucket-" + entry.bucket(); + String filePath = entry.file().fileName(); + + Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, entry.partition()); + if (partitionPath.isPresent()) { + return String.join("/", basePath, partitionPath.get(), bucketPath, filePath); + } else { + return String.join("/", basePath, bucketPath, filePath); + } + } + + private List<ColumnStat> toColumnStats(DataFileMeta file) { + // TODO: Implement logic to extract column stats from the file meta Review Comment: Let's track this as a separate GH issue if we are not already. ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.io.IOException; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; + +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.spi.extractor.ConversionSource; + +public class PaimonConversionSourceProvider extends ConversionSourceProvider<Snapshot> { + @Override + public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable sourceTableConfig) { + try { + Options catalogOptions = new Options(); + CatalogContext context = CatalogContext.create(catalogOptions, hadoopConf); + + Path path = new Path(sourceTableConfig.getDataPath()); + FileIO fileIO = FileIO.get(path, context); + FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path); + + return new PaimonConversionSource(paimonTable); + } catch (IOException e) { + throw new ReadException(e.getMessage()); Review Comment: Let's update this handling so the stacktrace is propagated. `throw new ReadException("Failed to read Paimon table from file system", e)` ########## xtable-core/src/test/java/org/apache/xtable/ITConversionController.java: ########## @@ -104,9 +106,12 @@ import org.apache.xtable.iceberg.TestIcebergDataHelper; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.paimon.PaimonConversionSourceProvider; public class ITConversionController { - @TempDir public static Path tempDir; + @TempDir(cleanup = CleanupMode.NEVER) // TODO remove CleanupMode.NEVER after debugging Review Comment: Can this be reverted now? ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.*; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.storage.InternalDataFile; + +public class PaimonDataFileExtractor { + + private final PaimonPartitionExtractor partitionExtractor = + PaimonPartitionExtractor.getInstance(); + + private static final PaimonDataFileExtractor INSTANCE = new PaimonDataFileExtractor(); + + public static PaimonDataFileExtractor getInstance() { + return INSTANCE; + } + + public List<InternalDataFile> toInternalDataFiles(FileStoreTable table, Snapshot snapshot) { + List<InternalDataFile> result = new ArrayList<>(); + Iterator<ManifestEntry> manifestEntryIterator = + newSnapshotReader(table, snapshot).readFileIterator(); + while (manifestEntryIterator.hasNext()) { + result.add(toInternalDataFile(table, manifestEntryIterator.next())); + } + return result; + } + + private InternalDataFile toInternalDataFile(FileStoreTable table, ManifestEntry entry) { + return InternalDataFile.builder() + .physicalPath(toFullPhysicalPath(table, entry)) + .fileSizeBytes(entry.file().fileSize()) + .lastModified(entry.file().creationTimeEpochMillis()) + .recordCount(entry.file().rowCount()) + .partitionValues(partitionExtractor.toPartitionValues(table, entry.partition())) + .columnStats(toColumnStats(entry.file())) + .build(); + } + + private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) { + String basePath = table.location().toString(); + String bucketPath = "bucket-" + entry.bucket(); + String filePath = entry.file().fileName(); + + Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, entry.partition()); + if (partitionPath.isPresent()) { + return String.join("/", basePath, partitionPath.get(), bucketPath, filePath); + } else { + return String.join("/", basePath, bucketPath, filePath); + } + } + + private List<ColumnStat> toColumnStats(DataFileMeta file) { + // TODO: Implement logic to extract column stats from the file meta + return Collections.emptyList(); + } + + private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) { + // If the table has primary keys, we read only the top level files Review Comment: Just curious, is this similar to the Hudi Merge on Read table? ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.io.IOException; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; + +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.spi.extractor.ConversionSource; + +public class PaimonConversionSourceProvider extends ConversionSourceProvider<Snapshot> { + @Override + public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable sourceTableConfig) { + try { + Options catalogOptions = new Options(); Review Comment: We don't need any changes right now but will the user want to supply some custom options here? ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import lombok.extern.log4j.Log4j2; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Log4j2 +public class PaimonConversionSource implements ConversionSource<Snapshot> { + + private final FileStoreTable paimonTable; + private final SchemaManager schemaManager; + private final SnapshotManager snapshotManager; + + private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance(); + private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance(); + private final PaimonPartitionExtractor partitionSpecExtractor = + PaimonPartitionExtractor.getInstance(); + + public PaimonConversionSource(FileStoreTable paimonTable) { + this.paimonTable = paimonTable; + this.schemaManager = paimonTable.schemaManager(); + this.snapshotManager = paimonTable.snapshotManager(); + } + + @Override + public InternalTable getTable(Snapshot snapshot) { + TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId()); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + + List<String> partitionKeys = paimonTable.partitionKeys(); + List<InternalPartitionField> partitioningFields = + partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema); + + DataLayoutStrategy dataLayoutStrategy = + partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : HIVE_STYLE_PARTITION; + + return InternalTable.builder() + .name(paimonTable.name()) + .tableFormat(TableFormat.PAIMON) + .readSchema(internalSchema) + .layoutStrategy(dataLayoutStrategy) + .basePath(paimonTable.location().toString()) + .partitioningFields(partitioningFields) + .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis())) + .latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString()) + .build(); + } + + @Override + public InternalTable getCurrentTable() { + SnapshotManager snapshotManager = paimonTable.snapshotManager(); + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + throw new ReadException("No snapshots found for table " + paimonTable.name()); + } + return getTable(snapshot); + } + + @Override + public InternalSnapshot getCurrentSnapshot() { + SnapshotManager snapshotManager = paimonTable.snapshotManager(); + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + throw new ReadException("No snapshots found for table " + paimonTable.name()); + } + + InternalTable internalTable = getTable(snapshot); + List<InternalDataFile> internalDataFiles = + dataFileExtractor.toInternalDataFiles(paimonTable, snapshot); + + return InternalSnapshot.builder() + .table(internalTable) + .version(Long.toString(snapshot.timeMillis())) + .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) + // TODO : Implement pending commits extraction, required for incremental sync Review Comment: Nitpick: Can you create a GH Issue to track the incremental sync work if one does not already exist? ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.*; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.InternalRowPartitionComputer; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; + +/** Extracts partition spec for Paimon as identity transforms on partition keys. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PaimonPartitionExtractor { + + private final PaimonSchemaExtractor paimonSchemaExtractor = PaimonSchemaExtractor.getInstance(); + + private static final PaimonPartitionExtractor INSTANCE = new PaimonPartitionExtractor(); + + public static PaimonPartitionExtractor getInstance() { + return INSTANCE; + } + + public List<InternalPartitionField> toInternalPartitionFields( + List<String> partitionKeys, InternalSchema schema) { + if (partitionKeys == null || partitionKeys.isEmpty()) { + return Collections.emptyList(); + } + return partitionKeys.stream() + .map(key -> toPartitionField(key, schema)) + .collect(Collectors.toList()); + } + + public List<PartitionValue> toPartitionValues(FileStoreTable table, BinaryRow partition) { + InternalRowPartitionComputer partitionComputer = newPartitionComputer(table); + InternalSchema internalSchema = paimonSchemaExtractor.toInternalSchema(table.schema()); Review Comment: Can we pass in the schema to avoid this conversion per file? ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.*; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.InternalRowPartitionComputer; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; + +/** Extracts partition spec for Paimon as identity transforms on partition keys. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PaimonPartitionExtractor { + + private final PaimonSchemaExtractor paimonSchemaExtractor = PaimonSchemaExtractor.getInstance(); + + private static final PaimonPartitionExtractor INSTANCE = new PaimonPartitionExtractor(); + + public static PaimonPartitionExtractor getInstance() { + return INSTANCE; + } + + public List<InternalPartitionField> toInternalPartitionFields( + List<String> partitionKeys, InternalSchema schema) { + if (partitionKeys == null || partitionKeys.isEmpty()) { + return Collections.emptyList(); + } + return partitionKeys.stream() + .map(key -> toPartitionField(key, schema)) + .collect(Collectors.toList()); + } + + public List<PartitionValue> toPartitionValues(FileStoreTable table, BinaryRow partition) { + InternalRowPartitionComputer partitionComputer = newPartitionComputer(table); + InternalSchema internalSchema = paimonSchemaExtractor.toInternalSchema(table.schema()); + + List<PartitionValue> partitionValues = new ArrayList<>(); Review Comment: Nitpick: let's establish the list with the size of the map produced by `partitionComputer.generatePartValues(partition)` - this is done per file so we want to make sure we're as lean as possible when running the initial snapshot syncs on very large tables. We can also return Collections.emptyList when the table is not partitioned. ########## xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.*; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.InternalRowPartitionComputer; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; + +/** Extracts partition spec for Paimon as identity transforms on partition keys. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PaimonPartitionExtractor { + + private final PaimonSchemaExtractor paimonSchemaExtractor = PaimonSchemaExtractor.getInstance(); + + private static final PaimonPartitionExtractor INSTANCE = new PaimonPartitionExtractor(); + + public static PaimonPartitionExtractor getInstance() { + return INSTANCE; + } + + public List<InternalPartitionField> toInternalPartitionFields( + List<String> partitionKeys, InternalSchema schema) { + if (partitionKeys == null || partitionKeys.isEmpty()) { + return Collections.emptyList(); + } + return partitionKeys.stream() + .map(key -> toPartitionField(key, schema)) + .collect(Collectors.toList()); + } + + public List<PartitionValue> toPartitionValues(FileStoreTable table, BinaryRow partition) { + InternalRowPartitionComputer partitionComputer = newPartitionComputer(table); + InternalSchema internalSchema = paimonSchemaExtractor.toInternalSchema(table.schema()); + + List<PartitionValue> partitionValues = new ArrayList<>(); + for (Map.Entry<String, String> entry : + partitionComputer.generatePartValues(partition).entrySet()) { + PartitionValue partitionValue = + PartitionValue.builder() + .partitionField(toPartitionField(entry.getKey(), internalSchema)) + .range(Range.scalar(entry.getValue())) + .build(); + partitionValues.add(partitionValue); + } + return partitionValues; + } + + public Optional<String> toPartitionPath(FileStoreTable table, BinaryRow partition) { + InternalRowPartitionComputer partitionComputer = newPartitionComputer(table); + return partitionComputer.generatePartValues(partition).entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .reduce((a, b) -> a + "/" + b); + } + + private InternalPartitionField toPartitionField(String key, InternalSchema schema) { + InternalField sourceField = + findField(schema, key) + .orElseThrow( + () -> new IllegalArgumentException("Partition key not found in schema: " + key)); + return InternalPartitionField.builder() + .sourceField(sourceField) + .transformType(PartitionTransformType.VALUE) Review Comment: Does Paimon allow users to partition on a timestamp? ########## xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java: ########## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarCharType; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; + +public class TestPaimonSchemaExtractor { + private static final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance(); + + private void assertField(DataField paimonField, InternalField expectedInternalField) { + assertField(paimonField, expectedInternalField, Collections.emptyList()); + } + + private void assertField( + DataField paimonField, InternalField expectedInternalField, List<String> primaryKeys) { + TableSchema paimonSchema = + new TableSchema( + 0, + Collections.singletonList(paimonField), + 0, + Collections.emptyList(), + primaryKeys, + new HashMap<>(), + ""); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + List<InternalField> recordKeyFields = + primaryKeys.isEmpty() + ? Collections.emptyList() + : Collections.singletonList(expectedInternalField); + InternalSchema expectedSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(Collections.singletonList(expectedInternalField)) + .recordKeyFields(recordKeyFields) + .build(); + assertEquals(expectedSchema, internalSchema); + } + + @Test + void testCharField() { + DataField paimonField = new DataField(0, "char_field", new CharType(10)); + InternalField expectedField = + InternalField.builder() + .name("char_field") + .fieldId(0) + .schema( + InternalSchema.builder() + .name("CHAR(10)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testVarcharField() { + DataField paimonField = new DataField(1, "varchar_field", new VarCharType(255)); + InternalField expectedField = + InternalField.builder() + .name("varchar_field") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("VARCHAR(255)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testBooleanField() { + DataField paimonField = new DataField(2, "boolean_field", new BooleanType()); + InternalField expectedField = + InternalField.builder() + .name("boolean_field") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("BOOLEAN") + .dataType(InternalType.BOOLEAN) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testTinyIntField() { + DataField paimonField = new DataField(3, "tinyint_field", new TinyIntType()); + InternalField expectedField = + InternalField.builder() + .name("tinyint_field") + .fieldId(3) + .schema( + InternalSchema.builder() + .name("TINYINT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testSmallIntField() { + DataField paimonField = new DataField(4, "smallint_field", new SmallIntType()); + InternalField expectedField = + InternalField.builder() + .name("smallint_field") + .fieldId(4) + .schema( + InternalSchema.builder() + .name("SMALLINT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testIntField() { + DataField paimonField = new DataField(5, "int_field", new IntType()); + InternalField expectedField = + InternalField.builder() + .name("int_field") + .fieldId(5) + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testBigIntField() { + DataField paimonField = new DataField(6, "bigint_field", new BigIntType()); + InternalField expectedField = + InternalField.builder() + .name("bigint_field") + .fieldId(6) + .schema( + InternalSchema.builder() + .name("BIGINT") + .dataType(InternalType.LONG) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testFloatField() { + DataField paimonField = new DataField(7, "float_field", new FloatType()); + InternalField expectedField = + InternalField.builder() + .name("float_field") + .fieldId(7) + .schema( + InternalSchema.builder() + .name("FLOAT") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDoubleField() { + DataField paimonField = new DataField(8, "double_field", new DoubleType()); + InternalField expectedField = + InternalField.builder() + .name("double_field") + .fieldId(8) + .schema( + InternalSchema.builder() + .name("DOUBLE") + .dataType(InternalType.DOUBLE) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDateField() { + DataField paimonField = new DataField(9, "date_field", new DateType()); + InternalField expectedField = + InternalField.builder() + .name("date_field") + .fieldId(9) + .schema( + InternalSchema.builder() + .name("DATE") + .dataType(InternalType.DATE) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testTimestampField() { + DataField paimonField = new DataField(10, "timestamp_field", new TimestampType(3)); + Map<InternalSchema.MetadataKey, Object> timestampMetadata = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + InternalField expectedField = + InternalField.builder() + .name("timestamp_field") + .fieldId(10) + .schema( + InternalSchema.builder() + .name("TIMESTAMP(3)") + .dataType(InternalType.TIMESTAMP) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDecimalField() { + DataField paimonField = new DataField(11, "decimal_field", new DecimalType(10, 2)); + Map<InternalSchema.MetadataKey, Object> decimalMetadata = new HashMap<>(); + decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10); + decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2); + InternalField expectedField = + InternalField.builder() + .name("decimal_field") + .fieldId(11) + .schema( + InternalSchema.builder() + .name("DECIMAL(10, 2)") + .dataType(InternalType.DECIMAL) + .isNullable(true) + .metadata(decimalMetadata) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testStructField() { Review Comment: Nitpick: for my own sanity, I usually like to go three levels deep to ensure that the parent path is not simply the name of the parent field but rather the full path to that field. Can we do that here? ########## xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java: ########## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarCharType; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; + +public class TestPaimonSchemaExtractor { + private static final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance(); + + private void assertField(DataField paimonField, InternalField expectedInternalField) { + assertField(paimonField, expectedInternalField, Collections.emptyList()); + } + + private void assertField( + DataField paimonField, InternalField expectedInternalField, List<String> primaryKeys) { + TableSchema paimonSchema = + new TableSchema( + 0, + Collections.singletonList(paimonField), + 0, + Collections.emptyList(), + primaryKeys, + new HashMap<>(), + ""); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + List<InternalField> recordKeyFields = + primaryKeys.isEmpty() + ? Collections.emptyList() + : Collections.singletonList(expectedInternalField); + InternalSchema expectedSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(Collections.singletonList(expectedInternalField)) + .recordKeyFields(recordKeyFields) + .build(); + assertEquals(expectedSchema, internalSchema); + } + + @Test + void testCharField() { + DataField paimonField = new DataField(0, "char_field", new CharType(10)); + InternalField expectedField = + InternalField.builder() + .name("char_field") + .fieldId(0) + .schema( + InternalSchema.builder() + .name("CHAR(10)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testVarcharField() { + DataField paimonField = new DataField(1, "varchar_field", new VarCharType(255)); + InternalField expectedField = + InternalField.builder() + .name("varchar_field") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("VARCHAR(255)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testBooleanField() { + DataField paimonField = new DataField(2, "boolean_field", new BooleanType()); + InternalField expectedField = + InternalField.builder() + .name("boolean_field") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("BOOLEAN") + .dataType(InternalType.BOOLEAN) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testTinyIntField() { + DataField paimonField = new DataField(3, "tinyint_field", new TinyIntType()); + InternalField expectedField = + InternalField.builder() + .name("tinyint_field") + .fieldId(3) + .schema( + InternalSchema.builder() + .name("TINYINT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testSmallIntField() { + DataField paimonField = new DataField(4, "smallint_field", new SmallIntType()); + InternalField expectedField = + InternalField.builder() + .name("smallint_field") + .fieldId(4) + .schema( + InternalSchema.builder() + .name("SMALLINT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testIntField() { + DataField paimonField = new DataField(5, "int_field", new IntType()); + InternalField expectedField = + InternalField.builder() + .name("int_field") + .fieldId(5) + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testBigIntField() { + DataField paimonField = new DataField(6, "bigint_field", new BigIntType()); + InternalField expectedField = + InternalField.builder() + .name("bigint_field") + .fieldId(6) + .schema( + InternalSchema.builder() + .name("BIGINT") + .dataType(InternalType.LONG) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testFloatField() { + DataField paimonField = new DataField(7, "float_field", new FloatType()); + InternalField expectedField = + InternalField.builder() + .name("float_field") + .fieldId(7) + .schema( + InternalSchema.builder() + .name("FLOAT") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDoubleField() { + DataField paimonField = new DataField(8, "double_field", new DoubleType()); + InternalField expectedField = + InternalField.builder() + .name("double_field") + .fieldId(8) + .schema( + InternalSchema.builder() + .name("DOUBLE") + .dataType(InternalType.DOUBLE) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDateField() { + DataField paimonField = new DataField(9, "date_field", new DateType()); + InternalField expectedField = + InternalField.builder() + .name("date_field") + .fieldId(9) + .schema( + InternalSchema.builder() + .name("DATE") + .dataType(InternalType.DATE) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testTimestampField() { + DataField paimonField = new DataField(10, "timestamp_field", new TimestampType(3)); Review Comment: Can you add a test with millis precision as well if that is supported in Paimon? ########## xtable-service/pom.xml: ########## @@ -216,6 +216,17 @@ <scope>test</scope> </dependency> + <!-- Paimon dependencies --> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-bundle</artifactId> + </dependency> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-spark-${spark.version.prefix}</artifactId> + <version>${paimon.version}</version> Review Comment: We typically define all the versions (even for test dependences) in the parent pom. Can you move this version declaration to the dependency management section there? ########## xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java: ########## @@ -97,6 +98,15 @@ public static void setupOnce() { .sparkContext() .hadoopConfiguration() .set("parquet.avro.write-old-list-structure", "false"); + sparkSession + .sparkContext() + .conf() + .set( Review Comment: Can these be set directly on the sparkConf on line 94? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
