This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 8cceec71 Paimon Source Support (#742)
8cceec71 is described below
commit 8cceec7169012ee2cde2b9f1a72dfb49dc51588c
Author: Mike Dias <[email protected]>
AuthorDate: Thu Nov 6 09:30:23 2025 +1100
Paimon Source Support (#742)
* Paimon initial support
# Conflicts:
#
xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
# xtable-core/src/test/java/org/apache/xtable/GenericTable.java
#
xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
* Expanding imports and removing java 11 target
* fix compilation issue
* Add Paimon Unit Tests
* Addressing review comments
* Fix test
* Parameterizing timestamp precison tests
* Fix timestamp precision metadata
* Fixing tests by removing the paimon catalog config in spark
---
.gitignore | 1 +
pom.xml | 13 +
.../apache/xtable/model/storage/TableFormat.java | 3 +-
xtable-core/pom.xml | 11 +
.../xtable/paimon/PaimonConversionSource.java | 138 ++++++
.../paimon/PaimonConversionSourceProvider.java | 52 ++
.../xtable/paimon/PaimonDataFileExtractor.java | 99 ++++
.../xtable/paimon/PaimonPartitionExtractor.java | 112 +++++
.../xtable/paimon/PaimonSchemaExtractor.java | 231 +++++++++
.../test/java/org/apache/xtable/GenericTable.java | 7 +
.../org/apache/xtable/ITConversionController.java | 94 ++--
.../java/org/apache/xtable/TestPaimonTable.java | 307 ++++++++++++
.../xtable/paimon/TestPaimonConversionSource.java | 269 ++++++++++
.../xtable/paimon/TestPaimonDataFileExtractor.java | 183 +++++++
.../paimon/TestPaimonPartitionExtractor.java | 196 ++++++++
.../xtable/paimon/TestPaimonSchemaExtractor.java | 547 +++++++++++++++++++++
xtable-service/pom.xml | 10 +
.../apache/xtable/service/ITConversionService.java | 10 +-
18 files changed, 2244 insertions(+), 39 deletions(-)
diff --git a/.gitignore b/.gitignore
index 5a59990d..3e0130df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,7 @@ hs_err_pid*
# Ignore java-version and idea files.
.java-version
.idea
+.vscode
# Ignore Gradle project-specific cache directory
.gradle
diff --git a/pom.xml b/pom.xml
index ff80b954..f232c1ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<spark.version.prefix>3.4</spark.version.prefix>
<iceberg.version>1.4.2</iceberg.version>
<delta.version>2.4.0</delta.version>
+ <paimon.version>1.2.0</paimon.version>
<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
<apache.rat.version>0.16.1</apache.rat.version>
@@ -333,6 +334,18 @@
<version>${delta.hive.version}</version>
</dependency>
+ <!-- Paimon -->
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-bundle</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-spark-${spark.version.prefix}</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
index 9d89de6a..9ea7943a 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
@@ -27,9 +27,10 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
+ public static final String PAIMON = "PAIMON";
public static final String PARQUET = "PARQUET";
public static String[] values() {
- return new String[] {"HUDI", "ICEBERG", "DELTA"};
+ return new String[] {"HUDI", "ICEBERG", "DELTA", "PAIMON"};
}
}
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 6bd5282c..ce9aaeaf 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -110,6 +110,17 @@
<scope>test</scope>
</dependency>
+ <!-- Paimon dependencies -->
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-bundle</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-spark-${spark.version.prefix}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- Hadoop dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
new file mode 100644
index 00000000..1ef6dd99
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Log4j2
+public class PaimonConversionSource implements ConversionSource<Snapshot> {
+
+ private final FileStoreTable paimonTable;
+ private final SchemaManager schemaManager;
+ private final SnapshotManager snapshotManager;
+
+ private final PaimonDataFileExtractor dataFileExtractor =
PaimonDataFileExtractor.getInstance();
+ private final PaimonSchemaExtractor schemaExtractor =
PaimonSchemaExtractor.getInstance();
+ private final PaimonPartitionExtractor partitionSpecExtractor =
+ PaimonPartitionExtractor.getInstance();
+
+ public PaimonConversionSource(FileStoreTable paimonTable) {
+ this.paimonTable = paimonTable;
+ this.schemaManager = paimonTable.schemaManager();
+ this.snapshotManager = paimonTable.snapshotManager();
+ }
+
+ @Override
+ public InternalTable getTable(Snapshot snapshot) {
+ TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(paimonSchema);
+
+ List<String> partitionKeys = paimonTable.partitionKeys();
+ List<InternalPartitionField> partitioningFields =
+ partitionSpecExtractor.toInternalPartitionFields(partitionKeys,
internalSchema);
+
+ return InternalTable.builder()
+ .name(paimonTable.name())
+ .tableFormat(TableFormat.PAIMON)
+ .readSchema(internalSchema)
+ .layoutStrategy(DataLayoutStrategy.HIVE_STYLE_PARTITION)
+ .basePath(paimonTable.location().toString())
+ .partitioningFields(partitioningFields)
+ .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
+
.latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
+ .build();
+ }
+
+ @Override
+ public InternalTable getCurrentTable() {
+ Snapshot snapshot = getLastSnapshot();
+ return getTable(snapshot);
+ }
+
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+ Snapshot snapshot = getLastSnapshot();
+ InternalTable internalTable = getTable(snapshot);
+ InternalSchema internalSchema = internalTable.getReadSchema();
+ List<InternalDataFile> dataFiles =
+ dataFileExtractor.toInternalDataFiles(paimonTable, snapshot,
internalSchema);
+
+ return InternalSnapshot.builder()
+ .table(internalTable)
+ .version(Long.toString(snapshot.timeMillis()))
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(dataFiles))
+ // TODO : Implement pending commits extraction, required for
incremental sync
+ // https://github.com/apache/incubator-xtable/issues/754
+ .sourceIdentifier(getCommitIdentifier(snapshot))
+ .build();
+ }
+
+ private Snapshot getLastSnapshot() {
+ SnapshotManager snapshotManager = paimonTable.snapshotManager();
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null) {
+ throw new ReadException("No snapshots found for table " +
paimonTable.name());
+ }
+ return snapshot;
+ }
+
+ @Override
+ public TableChange getTableChangeForCommit(Snapshot snapshot) {
+ throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ }
+
+ @Override
+ public CommitsBacklog<Snapshot> getCommitsBacklog(
+ InstantsForIncrementalSync instantsForIncrementalSync) {
+ throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ }
+
+ @Override
+ public boolean isIncrementalSyncSafeFrom(Instant instant) {
+ return false; // Incremental sync is not supported yet
+ }
+
+ @Override
+ public String getCommitIdentifier(Snapshot snapshot) {
+ return Long.toString(snapshot.commitIdentifier());
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
new file mode 100644
index 00000000..64f16906
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.io.IOException;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+public class PaimonConversionSourceProvider extends
ConversionSourceProvider<Snapshot> {
+ @Override
+ public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable
sourceTableConfig) {
+ try {
+ Options catalogOptions = new Options();
+ CatalogContext context = CatalogContext.create(catalogOptions,
hadoopConf);
+
+ Path path = new Path(sourceTableConfig.getDataPath());
+ FileIO fileIO = FileIO.get(path, context);
+ FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path);
+
+ return new PaimonConversionSource(paimonTable);
+ } catch (IOException e) {
+ throw new ReadException("Failed to read Paimon table from file system",
e);
+ }
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
new file mode 100644
index 00000000..68ccfc3e
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.util.*;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class PaimonDataFileExtractor {
+
+ private final PaimonPartitionExtractor partitionExtractor =
+ PaimonPartitionExtractor.getInstance();
+
+ private static final PaimonDataFileExtractor INSTANCE = new
PaimonDataFileExtractor();
+
+ public static PaimonDataFileExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public List<InternalDataFile> toInternalDataFiles(
+ FileStoreTable table, Snapshot snapshot, InternalSchema internalSchema) {
+ List<InternalDataFile> result = new ArrayList<>();
+ Iterator<ManifestEntry> manifestEntryIterator =
+ newSnapshotReader(table, snapshot).readFileIterator();
+ while (manifestEntryIterator.hasNext()) {
+ result.add(toInternalDataFile(table, manifestEntryIterator.next(),
internalSchema));
+ }
+ return result;
+ }
+
+ private InternalDataFile toInternalDataFile(
+ FileStoreTable table, ManifestEntry entry, InternalSchema
internalSchema) {
+ return InternalDataFile.builder()
+ .physicalPath(toFullPhysicalPath(table, entry))
+ .fileSizeBytes(entry.file().fileSize())
+ .lastModified(entry.file().creationTimeEpochMillis())
+ .recordCount(entry.file().rowCount())
+ .partitionValues(
+ partitionExtractor.toPartitionValues(table, entry.partition(),
internalSchema))
+ .columnStats(toColumnStats(entry.file()))
+ .build();
+ }
+
+ private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry)
{
+ String basePath = table.location().toString();
+ String bucketPath = "bucket-" + entry.bucket();
+ String filePath = entry.file().fileName();
+
+ Optional<String> partitionPath = partitionExtractor.toPartitionPath(table,
entry.partition());
+ if (partitionPath.isPresent()) {
+ return String.join("/", basePath, partitionPath.get(), bucketPath,
filePath);
+ } else {
+ return String.join("/", basePath, bucketPath, filePath);
+ }
+ }
+
+ private List<ColumnStat> toColumnStats(DataFileMeta file) {
+ // TODO: Implement logic to extract column stats from the file meta
+ // https://github.com/apache/incubator-xtable/issues/755
+ return Collections.emptyList();
+ }
+
+ private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot
snapshot) {
+ // If the table has primary keys, we read only the top level files
+ // which means we can only consider fully compacted files.
+ if (!table.schema().primaryKeys().isEmpty()) {
+ return table
+ .newSnapshotReader()
+ .withLevel(table.coreOptions().numLevels() - 1)
+ .withSnapshot(snapshot);
+ } else {
+ return table.newSnapshotReader().withSnapshot(snapshot);
+ }
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
new file mode 100644
index 00000000..c8e6161e
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Extracts partition spec for Paimon as identity transforms on partition
keys. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonPartitionExtractor {
+
+ private static final PaimonPartitionExtractor INSTANCE = new
PaimonPartitionExtractor();
+
+ public static PaimonPartitionExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public List<InternalPartitionField> toInternalPartitionFields(
+ List<String> partitionKeys, InternalSchema schema) {
+ if (partitionKeys == null || partitionKeys.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return partitionKeys.stream()
+ .map(key -> toPartitionField(key, schema))
+ .collect(Collectors.toList());
+ }
+
+ public List<PartitionValue> toPartitionValues(
+ FileStoreTable table, BinaryRow partition, InternalSchema
internalSchema) {
+ InternalRowPartitionComputer partitionComputer =
newPartitionComputer(table);
+ Map<String, String> partValues =
partitionComputer.generatePartValues(partition);
+
+ if (partValues.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<PartitionValue> partitionValues = new ArrayList<>(partValues.size());
+ for (Map.Entry<String, String> entry : partValues.entrySet()) {
+ PartitionValue partitionValue =
+ PartitionValue.builder()
+ .partitionField(toPartitionField(entry.getKey(), internalSchema))
+ .range(Range.scalar(entry.getValue()))
+ .build();
+ partitionValues.add(partitionValue);
+ }
+ return partitionValues;
+ }
+
+ public Optional<String> toPartitionPath(FileStoreTable table, BinaryRow
partition) {
+ InternalRowPartitionComputer partitionComputer =
newPartitionComputer(table);
+ return partitionComputer.generatePartValues(partition).entrySet().stream()
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .reduce((a, b) -> a + "/" + b);
+ }
+
+ private InternalPartitionField toPartitionField(String key, InternalSchema
schema) {
+ InternalField sourceField =
+ findField(schema, key)
+ .orElseThrow(() -> new ReadException("Partition key not found in
schema: " + key));
+ return InternalPartitionField.builder()
+ .sourceField(sourceField)
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+ }
+
+ private Optional<InternalField> findField(InternalSchema schema, String
path) {
+ return schema.getAllFields().stream().filter(f ->
f.getPath().equals(path)).findFirst();
+ }
+
+ private InternalRowPartitionComputer newPartitionComputer(FileStoreTable
table) {
+ return new InternalRowPartitionComputer(
+ table.coreOptions().partitionDefaultName(),
+ table.store().partitionType(),
+ table.partitionKeys().toArray(new String[0]),
+ table.coreOptions().legacyPartitionName());
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java
new file mode 100644
index 00000000..6c9d824d
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SchemaUtils;
+
+/** Converts Paimon RowType to XTable InternalSchema. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonSchemaExtractor {
+ private static final PaimonSchemaExtractor INSTANCE = new
PaimonSchemaExtractor();
+
+ public static PaimonSchemaExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public InternalSchema toInternalSchema(TableSchema paimonSchema) {
+ RowType rowType = paimonSchema.logicalRowType();
+ List<InternalField> fields = toInternalFields(rowType);
+ return InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(fields)
+ .recordKeyFields(primaryKeyFields(paimonSchema, fields))
+ .build();
+ }
+
+ private List<InternalField> primaryKeyFields(
+ TableSchema paimonSchema, List<InternalField> internalFields) {
+ List<String> keys = paimonSchema.primaryKeys();
+ return internalFields.stream()
+ .filter(f -> keys.contains(f.getName()))
+ .collect(Collectors.toList());
+ }
+
+ private List<InternalField> toInternalFields(RowType rowType) {
+ List<InternalField> fields = new ArrayList<>(rowType.getFieldCount());
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ DataField dataField = rowType.getFields().get(i);
+ InternalField internalField =
+ InternalField.builder()
+ .name(dataField.name())
+ .fieldId(dataField.id())
+ .parentPath(null)
+ .schema(
+ fromPaimonType(dataField.type(), dataField.name(),
dataField.type().isNullable()))
+ .defaultValue(
+ dataField.type().isNullable() ?
InternalField.Constants.NULL_DEFAULT_VALUE : null)
+ .build();
+ fields.add(internalField);
+ }
+ return fields;
+ }
+
+ private InternalSchema fromPaimonType(DataType type, String fieldPath,
boolean nullable) {
+ InternalType internalType;
+ List<InternalField> fields = null;
+ Map<InternalSchema.MetadataKey, Object> metadata = null;
+ if (type instanceof CharType || type instanceof VarCharType) {
+ internalType = InternalType.STRING;
+ } else if (type instanceof BooleanType) {
+ internalType = InternalType.BOOLEAN;
+ } else if (type instanceof TinyIntType
+ || type instanceof SmallIntType
+ || type instanceof IntType) {
+ internalType = InternalType.INT;
+ } else if (type instanceof BigIntType) {
+ internalType = InternalType.LONG;
+ } else if (type instanceof FloatType) {
+ internalType = InternalType.FLOAT;
+ } else if (type instanceof DoubleType) {
+ internalType = InternalType.DOUBLE;
+ } else if (type instanceof BinaryType || type instanceof VarBinaryType) {
+ internalType = InternalType.BYTES;
+ } else if (type instanceof DateType) {
+ internalType = InternalType.DATE;
+ } else if (type instanceof TimestampType || type instanceof
LocalZonedTimestampType) {
+ internalType = InternalType.TIMESTAMP;
+ int precision;
+ if (type instanceof TimestampType) {
+ precision = ((TimestampType) type).getPrecision();
+ } else {
+ precision = ((LocalZonedTimestampType) type).getPrecision();
+ }
+
+ InternalSchema.MetadataValue precisionValue;
+ if (precision <= 3) {
+ precisionValue = InternalSchema.MetadataValue.MILLIS;
+ } else if (precision <= 6) {
+ precisionValue = InternalSchema.MetadataValue.MICROS;
+ } else {
+ precisionValue = InternalSchema.MetadataValue.NANOS;
+ }
+
+ metadata =
+
Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
precisionValue);
+ } else if (type instanceof DecimalType) {
+ DecimalType d = (DecimalType) type;
+ metadata = new HashMap<>(2, 1.0f);
+ metadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION,
d.getPrecision());
+ metadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, d.getScale());
+ internalType = InternalType.DECIMAL;
+ } else if (type instanceof RowType) {
+ RowType rt = (RowType) type;
+ List<InternalField> nested = new ArrayList<>(rt.getFieldCount());
+ for (DataField df : rt.getFields()) {
+ nested.add(
+ InternalField.builder()
+ .name(df.name())
+ .fieldId(df.id())
+ .parentPath(fieldPath)
+ .schema(
+ fromPaimonType(
+ df.type(),
+ SchemaUtils.getFullyQualifiedPath(fieldPath,
df.name()),
+ df.type().isNullable()))
+ .defaultValue(
+ df.type().isNullable() ?
InternalField.Constants.NULL_DEFAULT_VALUE : null)
+ .build());
+ }
+ fields = nested;
+ internalType = InternalType.RECORD;
+ } else if (type instanceof ArrayType) {
+ ArrayType at = (ArrayType) type;
+ InternalSchema elementSchema =
+ fromPaimonType(
+ at.getElementType(),
+ SchemaUtils.getFullyQualifiedPath(
+ fieldPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
+ at.getElementType().isNullable());
+ InternalField elementField =
+ InternalField.builder()
+ .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .parentPath(fieldPath)
+ .schema(elementSchema)
+ .build();
+ fields = Collections.singletonList(elementField);
+ internalType = InternalType.LIST;
+ } else if (type instanceof MapType) {
+ MapType mt = (MapType) type;
+ InternalSchema keySchema =
+ fromPaimonType(
+ mt.getKeyType(),
+ SchemaUtils.getFullyQualifiedPath(
+ fieldPath, InternalField.Constants.MAP_KEY_FIELD_NAME),
+ false);
+ InternalField keyField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+ .parentPath(fieldPath)
+ .schema(keySchema)
+ .build();
+ InternalSchema valueSchema =
+ fromPaimonType(
+ mt.getValueType(),
+ SchemaUtils.getFullyQualifiedPath(
+ fieldPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
+ mt.getValueType().isNullable());
+ InternalField valueField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+ .parentPath(fieldPath)
+ .schema(valueSchema)
+ .build();
+ fields = Arrays.asList(keyField, valueField);
+ internalType = InternalType.MAP;
+ } else {
+ throw new NotSupportedException("Unsupported Paimon type: " +
type.asSQLString());
+ }
+
+ return InternalSchema.builder()
+ .name(type.asSQLString())
+ .dataType(internalType)
+ .isNullable(nullable)
+ .metadata(metadata)
+ .fields(fields)
+ .build();
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
index 14395e0d..a5670eac 100644
--- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
@@ -21,6 +21,7 @@ package org.apache.xtable;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
import static org.apache.xtable.model.storage.TableFormat.PARQUET;
import java.nio.file.Path;
@@ -91,6 +92,9 @@ public interface GenericTable<T, Q> extends AutoCloseable {
case ICEBERG:
return TestIcebergTable.forStandardSchemaAndPartitioning(
tableName, isPartitioned ? "level" : null, tempDir,
jsc.hadoopConfiguration());
+ case PAIMON:
+ return TestPaimonTable.createTable(
+ tableName, isPartitioned ? "level" : null, tempDir,
jsc.hadoopConfiguration(), false);
default:
throw new IllegalArgumentException("Unsupported source format: " +
sourceFormat);
}
@@ -113,6 +117,9 @@ public interface GenericTable<T, Q> extends AutoCloseable {
case ICEBERG:
return TestIcebergTable.forSchemaWithAdditionalColumnsAndPartitioning(
tableName, isPartitioned ? "level" : null, tempDir,
jsc.hadoopConfiguration());
+ case PAIMON:
+ return TestPaimonTable.createTable(
+ tableName, isPartitioned ? "level" : null, tempDir,
jsc.hadoopConfiguration(), true);
default:
throw new IllegalArgumentException("Unsupported source format: " +
sourceFormat);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index bda54c0f..b8ea413b 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -24,6 +24,7 @@ import static
org.apache.xtable.hudi.HudiTestUtil.PartitionConfig;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
import static org.apache.xtable.model.storage.TableFormat.PARQUET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -104,9 +105,11 @@ import
org.apache.xtable.iceberg.IcebergConversionSourceProvider;
import org.apache.xtable.iceberg.TestIcebergDataHelper;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.paimon.PaimonConversionSourceProvider;
public class ITConversionController {
@TempDir public static Path tempDir;
+
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -117,12 +120,14 @@ public class ITConversionController {
@BeforeAll
public static void setupOnce() {
SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
sparkSession =
SparkSession.builder().config(HoodieReadClient.addHoodieSupport(sparkConf)).getOrCreate();
sparkSession
.sparkContext()
.hadoopConfiguration()
.set("parquet.avro.write-old-list-structure", "false");
+
jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
}
@@ -142,10 +147,13 @@ public class ITConversionController {
private static Stream<Arguments>
generateTestParametersForFormatsSyncModesAndPartitioning() {
List<Arguments> arguments = new ArrayList<>();
- for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
+ for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) {
for (SyncMode syncMode : SyncMode.values()) {
+ if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL)
+ continue; // Paimon does not support incremental sync yet
+
for (boolean isPartitioned : new boolean[] {true, false}) {
- arguments.add(Arguments.of(sourceTableFormat, syncMode,
isPartitioned));
+ arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned));
}
}
}
@@ -170,23 +178,37 @@ public class ITConversionController {
}
private ConversionSourceProvider<?> getConversionSourceProvider(String
sourceTableFormat) {
- if (sourceTableFormat.equalsIgnoreCase(HUDI)) {
- ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider =
- new HudiConversionSourceProvider();
- hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
- return hudiConversionSourceProvider;
- } else if (sourceTableFormat.equalsIgnoreCase(DELTA)) {
- ConversionSourceProvider<Long> deltaConversionSourceProvider =
- new DeltaConversionSourceProvider();
- deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
- return deltaConversionSourceProvider;
- } else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) {
- ConversionSourceProvider<Snapshot> icebergConversionSourceProvider =
- new IcebergConversionSourceProvider();
- icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
- return icebergConversionSourceProvider;
- } else {
- throw new IllegalArgumentException("Unsupported source format: " +
sourceTableFormat);
+ switch (sourceTableFormat.toUpperCase()) {
+ case HUDI:
+ {
+ ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider
=
+ new HudiConversionSourceProvider();
+ hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return hudiConversionSourceProvider;
+ }
+ case DELTA:
+ {
+ ConversionSourceProvider<Long> deltaConversionSourceProvider =
+ new DeltaConversionSourceProvider();
+ deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return deltaConversionSourceProvider;
+ }
+ case ICEBERG:
+ {
+ ConversionSourceProvider<Snapshot> icebergConversionSourceProvider =
+ new IcebergConversionSourceProvider();
+ icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return icebergConversionSourceProvider;
+ }
+ case PAIMON:
+ {
+ ConversionSourceProvider<org.apache.paimon.Snapshot>
paimonConversionSourceProvider =
+ new PaimonConversionSourceProvider();
+ paimonConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return paimonConversionSourceProvider;
+ }
+ default:
+ throw new IllegalArgumentException("Unsupported source format: " +
sourceTableFormat);
}
}
@@ -486,11 +508,9 @@ public class ITConversionController {
private static List<String> getOtherFormats(String sourceTableFormat) {
return Arrays.stream(TableFormat.values())
- .filter(
- format ->
- !format.equals(sourceTableFormat)
- && !format.equals(
- PARQUET)) // excluded file formats because upset,
insert etc. not supported
+ .filter(fmt -> !fmt.equals(sourceTableFormat))
+ .filter(fmt -> !fmt.equals(PAIMON)) // Paimon target is not supported
yet
+ .filter(fmt -> !fmt.equals(PARQUET)) // upserts/inserts are not
supported in Parquet
.collect(Collectors.toList());
}
@@ -911,34 +931,34 @@ public class ITConversionController {
}));
String[] selectColumnsArr = sourceTable.getColumnsToSelect().toArray(new
String[] {});
- List<String> dataset1Rows =
sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+ List<String> sourceRowsList =
sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
targetRowsByFormat.forEach(
- (format, targetRows) -> {
- List<String> dataset2Rows =
+ (targetFormat, targetRows) -> {
+ List<String> targetRowsList =
targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
assertEquals(
- dataset1Rows.size(),
- dataset2Rows.size(),
+ sourceRowsList.size(),
+ targetRowsList.size(),
String.format(
"Datasets have different row counts when reading from Spark.
Source: %s, Target: %s",
- sourceFormat, format));
+ sourceFormat, targetFormat));
// sanity check the count to ensure test is set up properly
if (expectedCount != null) {
- assertEquals(expectedCount, dataset1Rows.size());
+ assertEquals(expectedCount, sourceRowsList.size());
} else {
// if count is not known ahead of time, ensure datasets are
non-empty
- assertFalse(dataset1Rows.isEmpty());
+ assertFalse(sourceRowsList.isEmpty());
}
- if (containsUUIDFields(dataset1Rows) &&
containsUUIDFields(dataset2Rows)) {
- compareDatasetWithUUID(dataset1Rows, dataset2Rows);
+ if (containsUUIDFields(sourceRowsList) &&
containsUUIDFields(targetRowsList)) {
+ compareDatasetWithUUID(sourceRowsList, targetRowsList);
} else {
assertEquals(
- dataset1Rows,
- dataset2Rows,
+ sourceRowsList,
+ targetRowsList,
String.format(
"Datasets are not equivalent when reading from Spark.
Source: %s, Target: %s",
- sourceFormat, format));
+ sourceFormat, targetFormat));
}
});
}
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
new file mode 100644
index 00000000..55102007
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.BucketEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ParameterUtils;
+
+public class TestPaimonTable implements GenericTable<GenericRow, String> {
+
+ private final Random random = new Random();
+ private final FileStoreTable paimonTable;
+ private final String partitionField;
+
+ public TestPaimonTable(FileStoreTable paimonTable, String partitionField) {
+ this.paimonTable = paimonTable;
+ this.partitionField = partitionField;
+ }
+
+ public static GenericTable<GenericRow, String> createTable(
+ String tableName,
+ String partitionField,
+ Path tempDir,
+ Configuration hadoopConf,
+ boolean additionalColumns) {
+ String basePath = initBasePath(tempDir, tableName);
+ Catalog catalog = createFilesystemCatalog(basePath, hadoopConf);
+ FileStoreTable paimonTable = createTable(catalog, partitionField,
additionalColumns);
+
+ System.out.println(
+ "Initialized Paimon test table at base path: "
+ + basePath
+ + " with partition field: "
+ + partitionField
+ + " and additional columns: "
+ + additionalColumns);
+
+ return new TestPaimonTable(paimonTable, partitionField);
+ }
+
+ public static Catalog createFilesystemCatalog(String basePath, Configuration
hadoopConf) {
+ CatalogContext context = CatalogContext.create(new
org.apache.paimon.fs.Path(basePath));
+ return CatalogFactory.createCatalog(context);
+ }
+
+ public static FileStoreTable createTable(
+ Catalog catalog, String partitionField, boolean additionalColumns) {
+ try {
+ catalog.createDatabase("test_db", true);
+ Identifier identifier = Identifier.create("test_db", "test_table");
+ Schema schema = buildSchema(partitionField, additionalColumns);
+ catalog.createTable(identifier, schema, true);
+ return (FileStoreTable) catalog.getTable(identifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Schema buildSchema(String partitionField, boolean
additionalColumns) {
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .column("created_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .column("updated_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .column("is_active", DataTypes.BOOLEAN())
+ .column("description", DataTypes.VARCHAR(255))
+ .option("bucket", "1")
+ .option("bucket-key", "id")
+ .option("full-compaction.delta-commits", "1");
+
+ if (partitionField != null) {
+ builder
+ .primaryKey("id", partitionField)
+ .column(partitionField, DataTypes.STRING())
+ .partitionKeys(partitionField);
+ }
+
+ if (additionalColumns) {
+ builder.column("extra_info", DataTypes.STRING()).column("extra_value",
DataTypes.DOUBLE());
+ }
+
+ return builder.build();
+ }
+
+ private GenericRow buildGenericRow(int rowIdx, TableSchema schema, String
partitionValue) {
+ List<Object> rowValues = new ArrayList<>(schema.fields().size());
+ for (int i = 0; i < schema.fields().size(); i++) {
+ DataField field = schema.fields().get(i);
+ if (field.name().equals(partitionField)) {
+ rowValues.add(BinaryString.fromString(partitionValue));
+ } else if (field.type() instanceof IntType) {
+ rowValues.add(random.nextInt());
+ } else if (field.type() instanceof DoubleType) {
+ rowValues.add(random.nextDouble());
+ } else if (field.type() instanceof VarCharType) {
+ rowValues.add(BinaryString.fromString(field.name() + "_" + rowIdx +
"_" + i));
+ } else if (field.type() instanceof LocalZonedTimestampType) {
+ rowValues.add(Timestamp.fromEpochMillis(System.currentTimeMillis()));
+ } else if (field.type() instanceof BooleanType) {
+ rowValues.add(random.nextBoolean());
+ } else {
+ throw new UnsupportedOperationException("Unsupported field type: " +
field.type());
+ }
+ }
+
+ return GenericRow.of(rowValues.toArray());
+ }
+
+ private static String initBasePath(Path tempDir, String tableName) {
+ try {
+ Path basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+ return basePath.toUri().toString();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public List<GenericRow> insertRows(int numRows) {
+ String partitionValue = LEVEL_VALUES.get(0);
+ return insertRecordsToPartition(numRows, partitionValue);
+ }
+
+ @Override
+ public List<GenericRow> insertRecordsForSpecialPartition(int numRows) {
+ return insertRecordsToPartition(numRows, SPECIAL_PARTITION_VALUE);
+ }
+
+ private List<GenericRow> insertRecordsToPartition(int numRows, String
partitionValue) {
+ BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+ try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+ List<GenericRow> rows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; i++) {
+ GenericRow row = buildGenericRow(i, paimonTable.schema(),
partitionValue);
+ writer.write(row);
+ rows.add(row);
+ }
+ commitWrites(batchWriteBuilder, writer);
+ compactTable();
+ return rows;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to insert rows into Paimon table", e);
+ }
+ }
+
+ @Override
+ public void upsertRows(List<GenericRow> rows) {
+ BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+ try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+ for (GenericRow row : rows) {
+ writer.write(row);
+ }
+ commitWrites(batchWriteBuilder, writer);
+ compactTable();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to upsert rows into Paimon table", e);
+ }
+ }
+
+ @Override
+ public void deleteRows(List<GenericRow> rows) {
+ BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+ try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+ for (GenericRow row : rows) {
+ row.setRowKind(RowKind.DELETE);
+ writer.write(row);
+ }
+ commitWrites(batchWriteBuilder, writer);
+ compactTable();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delete rows from Paimon table", e);
+ }
+ }
+
+ private void compactTable() {
+ BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+ SnapshotReader snapshotReader = paimonTable.newSnapshotReader();
+ try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+ for (BucketEntry bucketEntry : snapshotReader.bucketEntries()) {
+ writer.compact(bucketEntry.partition(), bucketEntry.bucket(), true);
+ }
+ commitWrites(batchWriteBuilder, writer);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to compact writes in Paimon table",
e);
+ }
+ }
+
+ private static void commitWrites(BatchWriteBuilder batchWriteBuilder,
BatchTableWrite writer)
+ throws Exception {
+ BatchTableCommit commit = batchWriteBuilder.newCommit();
+ List<CommitMessage> messages = writer.prepareCommit();
+ try {
+ commit.commit(messages);
+ } catch (Exception e) {
+ commit.abort(messages);
+ throw new RuntimeException("Failed to commit writes to Paimon table", e);
+ } finally {
+ commit.close();
+ }
+ }
+
+ @Override
+ public void deletePartition(String partitionValue) {
+ try (BatchTableCommit commit =
paimonTable.newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(
+ ParameterUtils.getPartitions(partitionField + "=" + partitionValue));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delete partition from Paimon
table", e);
+ }
+ }
+
+ @Override
+ public void deleteSpecialPartition() {
+ deletePartition(SPECIAL_PARTITION_VALUE);
+ }
+
+ @Override
+ public String getBasePath() {
+ return paimonTable.location().toString();
+ }
+
+ @Override
+ public String getMetadataPath() {
+ return paimonTable.snapshotManager().snapshotDirectory().toString();
+ }
+
+ @Override
+ public String getOrderByColumn() {
+ return "id";
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void reload() {}
+
+ @Override
+ public List<String> getColumnsToSelect() {
+ return paimonTable.schema().fieldNames().stream()
+ .filter(
+ // TODO Hudi thinks that paimon buckets are partition values, not
sure how to handle it
+ // filtering out the partition field on the comparison for now
+ field -> !field.equals(partitionField))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public String getFilterQuery() {
+ return "id % 2 = 0";
+ }
+
+ public FileStoreTable getPaimonTable() {
+ return paimonTable;
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
new file mode 100644
index 00000000..4d8f8c2b
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class TestPaimonConversionSource {
+
+ @TempDir private Path tempDir;
+
+ private Configuration hadoopConf;
+ private TestPaimonTable testTable;
+ private FileStoreTable paimonTable;
+ private PaimonConversionSource conversionSource;
+
+ @BeforeEach
+ void setUp() {
+ hadoopConf = new Configuration();
+ testTable =
+ ((TestPaimonTable)
+ TestPaimonTable.createTable("test_table", "level", tempDir,
hadoopConf, false));
+ paimonTable = testTable.getPaimonTable();
+ conversionSource = new PaimonConversionSource(paimonTable);
+ }
+
+ @Test
+ void testGetTableWithPartitionedTable() {
+ testTable.insertRows(5);
+
+ Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(snapshot);
+
+ InternalTable result = conversionSource.getTable(snapshot);
+
+ assertNotNull(result);
+ assertEquals("test_table", result.getName());
+ assertEquals(TableFormat.PAIMON, result.getTableFormat());
+ assertNotNull(result.getReadSchema());
+ assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION,
result.getLayoutStrategy());
+ assertTrue(result.getBasePath().contains("test_table"));
+ assertEquals(1, result.getPartitioningFields().size());
+ assertEquals("level",
result.getPartitioningFields().get(0).getSourceField().getName());
+ assertEquals(Instant.ofEpochMilli(snapshot.timeMillis()),
result.getLatestCommitTime());
+ assertNotNull(result.getLatestMetadataPath());
+ }
+
+ @Test
+ void testGetTableWithUnpartitionedTable() {
+ GenericTable<?, String> unpartitionedTable =
+ TestPaimonTable.createTable("unpartitioned_table", null, tempDir,
hadoopConf, false);
+ FileStoreTable unpartitionedPaimonTable =
+ ((TestPaimonTable) unpartitionedTable).getPaimonTable();
+ PaimonConversionSource unpartitionedSource =
+ new PaimonConversionSource(unpartitionedPaimonTable);
+
+ unpartitionedTable.insertRows(3);
+
+ Snapshot snapshot =
unpartitionedPaimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(snapshot);
+
+ InternalTable result = unpartitionedSource.getTable(snapshot);
+
+ assertNotNull(result);
+ assertEquals("test_table", result.getName());
+ assertEquals(TableFormat.PAIMON, result.getTableFormat());
+ assertNotNull(result.getReadSchema());
+ assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION,
result.getLayoutStrategy());
+ assertTrue(result.getBasePath().contains("unpartitioned_table"));
+ assertEquals(0, result.getPartitioningFields().size());
+ assertEquals(Instant.ofEpochMilli(snapshot.timeMillis()),
result.getLatestCommitTime());
+ assertNotNull(result.getLatestMetadataPath());
+ }
+
+ @Test
+ void testGetCurrentTableSuccess() {
+ testTable.insertRows(3);
+
+ InternalTable result = conversionSource.getCurrentTable();
+
+ assertNotNull(result);
+ assertEquals(TableFormat.PAIMON, result.getTableFormat());
+ assertEquals("test_table", result.getName());
+ assertNotNull(result.getReadSchema());
+ assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION,
result.getLayoutStrategy());
+ assertEquals(1, result.getPartitioningFields().size());
+ }
+
+ @Test
+ void testGetCurrentTableThrowsExceptionWhenNoSnapshot() {
+ GenericTable<?, String> emptyTable =
+ TestPaimonTable.createTable("empty_table", "level", tempDir,
hadoopConf, false);
+ FileStoreTable emptyPaimonTable = ((TestPaimonTable)
emptyTable).getPaimonTable();
+ PaimonConversionSource emptySource = new
PaimonConversionSource(emptyPaimonTable);
+
+ ReadException exception = assertThrows(ReadException.class,
emptySource::getCurrentTable);
+
+ assertTrue(exception.getMessage().contains("No snapshots found for
table"));
+ }
+
+ @Test
+ void testGetCurrentSnapshotSuccess() {
+ testTable.insertRows(5);
+
+ InternalSnapshot result = conversionSource.getCurrentSnapshot();
+
+ assertNotNull(result);
+ assertNotNull(result.getTable());
+ assertEquals(TableFormat.PAIMON, result.getTable().getTableFormat());
+ assertNotNull(result.getVersion());
+ assertNotNull(result.getSourceIdentifier());
+ assertNotNull(result.getPartitionedDataFiles());
+
+ List<PartitionFileGroup> partitionFileGroups =
result.getPartitionedDataFiles();
+ assertFalse(partitionFileGroups.isEmpty());
+ assertTrue(partitionFileGroups.stream().allMatch(group ->
!group.getDataFiles().isEmpty()));
+ }
+
+ @Test
+ void testGetCurrentSnapshotThrowsExceptionWhenNoSnapshot() {
+ GenericTable<?, String> emptyTable =
+ TestPaimonTable.createTable("empty_table2", "level", tempDir,
hadoopConf, false);
+ FileStoreTable emptyPaimonTable = ((TestPaimonTable)
emptyTable).getPaimonTable();
+ PaimonConversionSource emptySource = new
PaimonConversionSource(emptyPaimonTable);
+
+ ReadException exception = assertThrows(ReadException.class,
emptySource::getCurrentSnapshot);
+
+ assertTrue(exception.getMessage().contains("No snapshots found for
table"));
+ }
+
+ @Test
+ void testGetTableChangeForCommitThrowsUnsupportedOperationException() {
+ testTable.insertRows(3);
+ Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+
+ UnsupportedOperationException exception =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> conversionSource.getTableChangeForCommit(snapshot));
+
+ assertEquals("Incremental Sync is not supported yet.",
exception.getMessage());
+ }
+
+ @Test
+ void testGetCommitsBacklogThrowsUnsupportedOperationException() {
+ InstantsForIncrementalSync mockInstants =
+
InstantsForIncrementalSync.builder().lastSyncInstant(Instant.now()).build();
+
+ UnsupportedOperationException exception =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> conversionSource.getCommitsBacklog(mockInstants));
+
+ assertEquals("Incremental Sync is not supported yet.",
exception.getMessage());
+ }
+
+ @Test
+ void testIsIncrementalSyncSafeFromReturnsFalse() {
+ Instant testInstant = Instant.now();
+
+ boolean result = conversionSource.isIncrementalSyncSafeFrom(testInstant);
+
+ assertFalse(result);
+ }
+
+ @Test
+ void testGetCommitIdentifier() {
+ testTable.insertRows(3);
+ Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+
+ String result = conversionSource.getCommitIdentifier(snapshot);
+
+ assertNotNull(result);
+ assertEquals(String.valueOf(snapshot.commitIdentifier()), result);
+ }
+
+ @Test
+ void testCloseDoesNotThrowException() {
+ assertDoesNotThrow(() -> conversionSource.close());
+ }
+
+ @Test
+ void testConstructorInitializesFieldsCorrectly() {
+ assertNotNull(conversionSource);
+
+ testTable.insertRows(1);
+ assertDoesNotThrow(() -> conversionSource.getCurrentTable());
+ }
+
+ @Test
+ void testMultipleSnapshots() {
+ testTable.insertRows(2);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ testTable.insertRows(3);
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+
+ assertNotEquals(firstSnapshot.id(), secondSnapshot.id());
+
+ InternalTable firstTable = conversionSource.getTable(firstSnapshot);
+ InternalTable secondTable = conversionSource.getTable(secondSnapshot);
+
+ assertNotNull(firstTable);
+ assertNotNull(secondTable);
+ assertEquals(firstTable.getName(), secondTable.getName());
+ assertEquals(firstTable.getTableFormat(), secondTable.getTableFormat());
+ }
+
+ @Test
+ void testSchemaEvolution() {
+ testTable.insertRows(2);
+
+ GenericTable<?, String> tableWithExtraColumns =
+ TestPaimonTable.createTable("table_with_extra", "level", tempDir,
hadoopConf, true);
+ FileStoreTable extraColumnsPaimonTable =
+ ((TestPaimonTable) tableWithExtraColumns).getPaimonTable();
+ PaimonConversionSource extraColumnsSource = new
PaimonConversionSource(extraColumnsPaimonTable);
+
+ tableWithExtraColumns.insertRows(2);
+
+ InternalTable originalTable = conversionSource.getCurrentTable();
+ InternalTable expandedTable = extraColumnsSource.getCurrentTable();
+
+ assertNotNull(originalTable);
+ assertNotNull(expandedTable);
+
+ assertTrue(
+ expandedTable.getReadSchema().getFields().size()
+ >= originalTable.getReadSchema().getFields().size());
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
new file mode 100644
index 00000000..9f906516
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.table.FileStoreTable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class TestPaimonDataFileExtractor {
+ private static final PaimonDataFileExtractor extractor =
PaimonDataFileExtractor.getInstance();
+
+ @TempDir private Path tempDir;
+ private TestPaimonTable testTable;
+ private FileStoreTable paimonTable;
+ private InternalSchema testSchema;
+
+ @Test
+ void testToInternalDataFilesWithUnpartitionedTable() {
+ createUnpartitionedTable();
+
+ // Insert some data to create files
+ testTable.insertRows(5);
+
+ List<InternalDataFile> result =
+ extractor.toInternalDataFiles(
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+
+ assertNotNull(result);
+ assertFalse(result.isEmpty());
+
+ InternalDataFile dataFile = result.get(0);
+ assertNotNull(dataFile.getPhysicalPath());
+ assertTrue(dataFile.getPhysicalPath().contains("bucket-"));
+ assertTrue(dataFile.getFileSizeBytes() > 0);
+ assertEquals(5, dataFile.getRecordCount());
+ assertEquals(0, dataFile.getPartitionValues().size());
+ }
+
+ @Test
+ void testToInternalDataFilesWithPartitionedTable() {
+ createPartitionedTable();
+
+ // Insert some data to create files
+ testTable.insertRows(5);
+
+ List<InternalDataFile> result =
+ extractor.toInternalDataFiles(
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+
+ assertNotNull(result);
+ assertFalse(result.isEmpty());
+
+ InternalDataFile dataFile = result.get(0);
+ assertNotNull(dataFile.getPhysicalPath());
+ assertTrue(dataFile.getPhysicalPath().contains("bucket-"));
+ assertTrue(dataFile.getFileSizeBytes() > 0);
+ assertEquals(5, dataFile.getRecordCount());
+ assertNotNull(dataFile.getPartitionValues());
+ }
+
+ @Test
+ void testToInternalDataFilesWithTableWithPrimaryKeys() {
+ createTableWithPrimaryKeys();
+
+ // Insert some data to create files
+ testTable.insertRows(5);
+
+ // Get the latest snapshot
+ List<InternalDataFile> result =
+ extractor.toInternalDataFiles(
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+
+ assertNotNull(result);
+ assertFalse(result.isEmpty());
+
+ InternalDataFile dataFile = result.get(0);
+ assertNotNull(dataFile.getPhysicalPath());
+ assertTrue(dataFile.getFileSizeBytes() > 0);
+ assertEquals(5, dataFile.getRecordCount());
+ }
+
+ @Test
+ void testPhysicalPathFormat() {
+ createUnpartitionedTable();
+
+ // Insert data
+ testTable.insertRows(2);
+
+ List<InternalDataFile> result =
+ extractor.toInternalDataFiles(
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+
+ assertFalse(result.isEmpty());
+
+ for (InternalDataFile dataFile : result) {
+ String path = dataFile.getPhysicalPath();
+ assertTrue(path.contains("bucket-"));
+ assertTrue(path.endsWith(".orc") || path.endsWith(".parquet"));
+ }
+ }
+
+ @Test
+ void testColumnStatsAreEmpty() {
+ createUnpartitionedTable();
+
+ testTable.insertRows(1);
+
+ List<InternalDataFile> result =
+ extractor.toInternalDataFiles(
+ paimonTable, paimonTable.snapshotManager().latestSnapshot(),
testSchema);
+
+ assertFalse(result.isEmpty());
+ for (InternalDataFile dataFile : result) {
+ assertEquals(0, dataFile.getColumnStats().size());
+ }
+ }
+
+ private void createUnpartitionedTable() {
+ testTable =
+ (TestPaimonTable)
+ TestPaimonTable.createTable("test_table", null, tempDir, new
Configuration(), false);
+ paimonTable = testTable.getPaimonTable();
+ testSchema =
+ InternalSchema.builder().build(); // empty schema won't matter for
non-partitioned tables
+ }
+
+ private void createPartitionedTable() {
+ testTable =
+ (TestPaimonTable)
+ TestPaimonTable.createTable("test_table", "level", tempDir, new
Configuration(), false);
+ paimonTable = testTable.getPaimonTable();
+
+ // just the partition field matters for this test
+ InternalField partitionField =
+ InternalField.builder()
+ .name("level")
+
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
+ .build();
+
+ testSchema =
InternalSchema.builder().fields(Collections.singletonList(partitionField)).build();
+ }
+
+ private void createTableWithPrimaryKeys() {
+ testTable =
+ (TestPaimonTable)
+ TestPaimonTable.createTable("test_table", null, tempDir, new
Configuration(), false);
+ paimonTable = testTable.getPaimonTable();
+ testSchema =
+ InternalSchema.builder().build(); // empty schema won't matter for
non-partitioned tables
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java
new file mode 100644
index 00000000..248c9f6f
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+public class TestPaimonPartitionExtractor {
+ private static final PaimonPartitionExtractor extractor =
PaimonPartitionExtractor.getInstance();
+
+ @TempDir private Path tempDir;
+
+ @Test
+ void testToInternalPartitionFieldsWithEmptyKeys() {
+ InternalSchema schema = createMockSchema();
+
+ List<InternalPartitionField> result =
extractor.toInternalPartitionFields(null, schema);
+ assertEquals(Collections.emptyList(), result);
+
+ result = extractor.toInternalPartitionFields(Collections.emptyList(),
schema);
+ assertEquals(Collections.emptyList(), result);
+ }
+
+ @Test
+ void testToInternalPartitionFieldsWithSingleKey() {
+ InternalSchema schema = createMockSchema();
+ List<String> partitionKeys = Collections.singletonList("level");
+
+ List<InternalPartitionField> result =
+ extractor.toInternalPartitionFields(partitionKeys, schema);
+
+ assertEquals(1, result.size());
+ InternalPartitionField partitionField = result.get(0);
+ assertEquals("level", partitionField.getSourceField().getName());
+ assertEquals(PartitionTransformType.VALUE,
partitionField.getTransformType());
+ }
+
+ @Test
+ void testToInternalPartitionFieldsWithMultipleKeys() {
+ InternalSchema schema = createMockSchema();
+ List<String> partitionKeys = Arrays.asList("level", "status");
+
+ List<InternalPartitionField> result =
+ extractor.toInternalPartitionFields(partitionKeys, schema);
+
+ assertEquals(2, result.size());
+ assertEquals("level", result.get(0).getSourceField().getName());
+ assertEquals("status", result.get(1).getSourceField().getName());
+ assertEquals(PartitionTransformType.VALUE,
result.get(0).getTransformType());
+ assertEquals(PartitionTransformType.VALUE,
result.get(1).getTransformType());
+ }
+
+ @Test
+ void testToInternalPartitionFieldsWithMissingKey() {
+ InternalSchema schema = createMockSchema();
+ List<String> partitionKeys = Collections.singletonList("missing_key");
+
+ ReadException exception =
+ assertThrows(
+ ReadException.class, () ->
extractor.toInternalPartitionFields(partitionKeys, schema));
+
+ assertTrue(exception.getMessage().contains("Partition key not found in
schema: missing_key"));
+ }
+
+ @Test
+ void testToPartitionValuesWithPartitionedTable() {
+ TestPaimonTable testTable = createPartitionedTable();
+ FileStoreTable paimonTable = testTable.getPaimonTable();
+
+ testTable.insertRows(1);
+
+ BinaryRow partition = BinaryRow.singleColumn("INFO");
+
+ InternalSchema schema = createMockSchema();
+ List<PartitionValue> result = extractor.toPartitionValues(paimonTable,
partition, schema);
+
+ assertEquals(1, result.size());
+ PartitionValue partitionValue = result.get(0);
+ assertEquals("level",
partitionValue.getPartitionField().getSourceField().getName());
+ assertEquals(Range.scalar("INFO"), partitionValue.getRange());
+ }
+
+ @Test
+ @Disabled("TODO: make it easier to create multi-partitioned table in tests")
+ void testToPartitionPathWithMultiplePartitionValues() {
+ // TODO this table is fixed at single partition, need to create a
multi-partitioned table
+ TestPaimonTable testTable = createPartitionedTable();
+ FileStoreTable paimonTable = testTable.getPaimonTable();
+
+ BinaryRow partition = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(partition);
+ writer.writeString(0, BinaryString.fromString("INFO"));
+ writer.writeString(1, BinaryString.fromString("active"));
+ writer.complete();
+
+ Optional<String> result = extractor.toPartitionPath(paimonTable,
partition);
+
+ assertTrue(result.isPresent());
+ assertEquals("level=INFO/level=DEBUG", result.get());
+ }
+
+ @Test
+ void testToPartitionPathWithEmptyPartitions() {
+ TestPaimonTable testTable = createUnpartitionedTable();
+ FileStoreTable paimonTable = testTable.getPaimonTable();
+
+ BinaryRow emptyPartition = BinaryRow.EMPTY_ROW;
+
+ Optional<String> result = extractor.toPartitionPath(paimonTable,
emptyPartition);
+
+ assertFalse(result.isPresent());
+ }
+
+ private InternalSchema createMockSchema() {
+ InternalField levelField =
+ InternalField.builder()
+ .name("level")
+ .schema(
+ InternalSchema.builder()
+ .name("STRING")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .build();
+
+ InternalField statusField =
+ InternalField.builder()
+ .name("status")
+ .schema(
+ InternalSchema.builder()
+ .name("STRING")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .build();
+
+ return InternalSchema.builder()
+ .name("test_schema")
+ .dataType(InternalType.RECORD)
+ .fields(Arrays.asList(levelField, statusField))
+ .build();
+ }
+
+ private TestPaimonTable createPartitionedTable() {
+ return (TestPaimonTable)
+ TestPaimonTable.createTable("test_table", "level", tempDir, new
Configuration(), false);
+ }
+
+ private TestPaimonTable createUnpartitionedTable() {
+ return (TestPaimonTable)
+ TestPaimonTable.createTable("test_table", null, tempDir, new
Configuration(), false);
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java
new file mode 100644
index 00000000..77f0ece0
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestPaimonSchemaExtractor {
+ private static final PaimonSchemaExtractor schemaExtractor =
PaimonSchemaExtractor.getInstance();
+
+ private void assertField(DataField paimonField, InternalField
expectedInternalField) {
+ assertField(paimonField, expectedInternalField, Collections.emptyList());
+ }
+
+ private void assertField(
+ DataField paimonField, InternalField expectedInternalField, List<String>
primaryKeys) {
+ TableSchema paimonSchema =
+ new TableSchema(
+ 0,
+ Collections.singletonList(paimonField),
+ 0,
+ Collections.emptyList(),
+ primaryKeys,
+ new HashMap<>(),
+ "");
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(paimonSchema);
+ List<InternalField> recordKeyFields =
+ primaryKeys.isEmpty()
+ ? Collections.emptyList()
+ : Collections.singletonList(expectedInternalField);
+ InternalSchema expectedSchema =
+ InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(Collections.singletonList(expectedInternalField))
+ .recordKeyFields(recordKeyFields)
+ .build();
+ assertEquals(expectedSchema, internalSchema);
+ }
+
+ @Test
+ void testCharField() {
+ DataField paimonField = new DataField(0, "char_field", new CharType(10));
+ InternalField expectedField =
+ InternalField.builder()
+ .name("char_field")
+ .fieldId(0)
+ .schema(
+ InternalSchema.builder()
+ .name("CHAR(10)")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testVarcharField() {
+ DataField paimonField = new DataField(1, "varchar_field", new
VarCharType(255));
+ InternalField expectedField =
+ InternalField.builder()
+ .name("varchar_field")
+ .fieldId(1)
+ .schema(
+ InternalSchema.builder()
+ .name("VARCHAR(255)")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testBooleanField() {
+ DataField paimonField = new DataField(2, "boolean_field", new
BooleanType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("boolean_field")
+ .fieldId(2)
+ .schema(
+ InternalSchema.builder()
+ .name("BOOLEAN")
+ .dataType(InternalType.BOOLEAN)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testTinyIntField() {
+ DataField paimonField = new DataField(3, "tinyint_field", new
TinyIntType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("tinyint_field")
+ .fieldId(3)
+ .schema(
+ InternalSchema.builder()
+ .name("TINYINT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testSmallIntField() {
+ DataField paimonField = new DataField(4, "smallint_field", new
SmallIntType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("smallint_field")
+ .fieldId(4)
+ .schema(
+ InternalSchema.builder()
+ .name("SMALLINT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testIntField() {
+ DataField paimonField = new DataField(5, "int_field", new IntType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("int_field")
+ .fieldId(5)
+ .schema(
+ InternalSchema.builder()
+ .name("INT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testBigIntField() {
+ DataField paimonField = new DataField(6, "bigint_field", new BigIntType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("bigint_field")
+ .fieldId(6)
+ .schema(
+ InternalSchema.builder()
+ .name("BIGINT")
+ .dataType(InternalType.LONG)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testFloatField() {
+ DataField paimonField = new DataField(7, "float_field", new FloatType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("float_field")
+ .fieldId(7)
+ .schema(
+ InternalSchema.builder()
+ .name("FLOAT")
+ .dataType(InternalType.FLOAT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testDoubleField() {
+ DataField paimonField = new DataField(8, "double_field", new DoubleType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("double_field")
+ .fieldId(8)
+ .schema(
+ InternalSchema.builder()
+ .name("DOUBLE")
+ .dataType(InternalType.DOUBLE)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testDateField() {
+ DataField paimonField = new DataField(9, "date_field", new DateType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("date_field")
+ .fieldId(9)
+ .schema(
+ InternalSchema.builder()
+ .name("DATE")
+ .dataType(InternalType.DATE)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+ void testTimestampField(int precision) {
+ DataField paimonField = new DataField(10, "timestamp_field", new
TimestampType(precision));
+
+ InternalSchema.MetadataValue expectedPrecision;
+ if (precision <= 3) {
+ expectedPrecision = InternalSchema.MetadataValue.MILLIS;
+ } else if (precision <= 6) {
+ expectedPrecision = InternalSchema.MetadataValue.MICROS;
+ } else {
+ expectedPrecision = InternalSchema.MetadataValue.NANOS;
+ }
+
+ Map<InternalSchema.MetadataKey, Object> timestampMetadata =
+
Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
expectedPrecision);
+ InternalField expectedField =
+ InternalField.builder()
+ .name("timestamp_field")
+ .fieldId(10)
+ .schema(
+ InternalSchema.builder()
+ .name("TIMESTAMP(" + precision + ")")
+ .dataType(InternalType.TIMESTAMP)
+ .isNullable(true)
+ .metadata(timestampMetadata)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {-1, 10})
+ void testInvalidTimestampPrecisionField(int invalidPrecision) {
+ assertThrows(IllegalArgumentException.class, () -> new
TimestampType(invalidPrecision));
+ }
+
+ @Test
+ void testDecimalField() {
+ DataField paimonField = new DataField(11, "decimal_field", new
DecimalType(10, 2));
+ Map<InternalSchema.MetadataKey, Object> decimalMetadata = new HashMap<>();
+ decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10);
+ decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2);
+ InternalField expectedField =
+ InternalField.builder()
+ .name("decimal_field")
+ .fieldId(11)
+ .schema(
+ InternalSchema.builder()
+ .name("DECIMAL(10, 2)")
+ .dataType(InternalType.DECIMAL)
+ .isNullable(true)
+ .metadata(decimalMetadata)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testStructField() {
+ DataField paimonField =
+ new DataField(
+ 12,
+ "struct_field",
+ RowType.of(
+ new DataType[] {
+ new IntType(),
+ new VarCharType(255),
+ RowType.of(new DataType[] {new DoubleType()}, new String[]
{"very_nested_double"})
+ },
+ new String[] {"nested_int", "nested_varchar",
"nested_struct"}));
+ InternalField nestedIntField =
+ InternalField.builder()
+ .name("nested_int")
+ .fieldId(0)
+ .parentPath("struct_field")
+ .schema(
+ InternalSchema.builder()
+ .name("INT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ InternalField nestedVarcharField =
+ InternalField.builder()
+ .name("nested_varchar")
+ .fieldId(1)
+ .parentPath("struct_field")
+ .schema(
+ InternalSchema.builder()
+ .name("VARCHAR(255)")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ InternalField veryNestedDoubleField =
+ InternalField.builder()
+ .name("very_nested_double")
+ .fieldId(0)
+ .parentPath("struct_field.nested_struct")
+ .schema(
+ InternalSchema.builder()
+ .name("DOUBLE")
+ .dataType(InternalType.DOUBLE)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ InternalField nestedStructField =
+ InternalField.builder()
+ .name("nested_struct")
+ .fieldId(2)
+ .parentPath("struct_field")
+ .schema(
+ InternalSchema.builder()
+ .name("ROW<`very_nested_double` DOUBLE>")
+ .dataType(InternalType.RECORD)
+ .isNullable(true)
+ .fields(Collections.singletonList(veryNestedDoubleField))
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ InternalField expectedField =
+ InternalField.builder()
+ .name("struct_field")
+ .fieldId(12)
+ .schema(
+ InternalSchema.builder()
+ .name(
+ "ROW<`nested_int` INT, `nested_varchar` VARCHAR(255),
`nested_struct` ROW<`very_nested_double` DOUBLE>>")
+ .dataType(InternalType.RECORD)
+ .isNullable(true)
+ .fields(Arrays.asList(nestedIntField, nestedVarcharField,
nestedStructField))
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testArrayField() {
+ DataField paimonField = new DataField(13, "array_field", new ArrayType(new
IntType()));
+ InternalField arrayElementField =
+ InternalField.builder()
+ .name("_one_field_element")
+ .parentPath("array_field")
+ .schema(
+ InternalSchema.builder()
+ .name("INT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .build();
+ InternalField expectedField =
+ InternalField.builder()
+ .name("array_field")
+ .fieldId(13)
+ .schema(
+ InternalSchema.builder()
+ .name("ARRAY<INT>")
+ .dataType(InternalType.LIST)
+ .isNullable(true)
+ .fields(Collections.singletonList(arrayElementField))
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testMapField() {
+ DataField paimonField =
+ new DataField(14, "map_field", new MapType(new VarCharType(255), new
IntType()));
+ InternalField mapKeyField =
+ InternalField.builder()
+ .name("_one_field_key")
+ .parentPath("map_field")
+ .schema(
+ InternalSchema.builder()
+ .name("VARCHAR(255)")
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .build();
+ InternalField mapValueField =
+ InternalField.builder()
+ .name("_one_field_value")
+ .parentPath("map_field")
+ .schema(
+ InternalSchema.builder()
+ .name("INT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .build();
+ InternalField expectedField =
+ InternalField.builder()
+ .name("map_field")
+ .fieldId(14)
+ .schema(
+ InternalSchema.builder()
+ .name("MAP<VARCHAR(255), INT>")
+ .dataType(InternalType.MAP)
+ .isNullable(true)
+ .fields(Arrays.asList(mapKeyField, mapValueField))
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField);
+ }
+
+ @Test
+ void testPrimaryKey() {
+ DataField paimonField = new DataField(0, "pk_field", new IntType());
+ InternalField expectedField =
+ InternalField.builder()
+ .name("pk_field")
+ .fieldId(0)
+ .schema(
+ InternalSchema.builder()
+ .name("INT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ assertField(paimonField, expectedField,
Collections.singletonList("pk_field"));
+ }
+
+ @Test
+ void testMultiplePrimaryKeys() {
+ DataField intField = new DataField(0, "int_pk", new IntType());
+ DataField stringField = new DataField(1, "string_pk", new
VarCharType(255));
+ List<DataField> paimonFields = Arrays.asList(intField, stringField);
+ List<String> primaryKeys = Arrays.asList("int_pk", "string_pk");
+ TableSchema paimonSchema =
+ new TableSchema(
+ 0, paimonFields, 0, Collections.emptyList(), primaryKeys, new
HashMap<>(), "");
+ InternalSchema internalSchema =
schemaExtractor.toInternalSchema(paimonSchema);
+
+ InternalField expectedIntField =
+ InternalField.builder()
+ .name("int_pk")
+ .fieldId(0)
+ .schema(
+ InternalSchema.builder()
+ .name("INT")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ InternalField expectedStringField =
+ InternalField.builder()
+ .name("string_pk")
+ .fieldId(1)
+ .schema(
+ InternalSchema.builder()
+ .name("VARCHAR(255)")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ List<InternalField> expectedFields = Arrays.asList(expectedIntField,
expectedStringField);
+ InternalSchema expectedSchema =
+ InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(expectedFields)
+ .recordKeyFields(expectedFields)
+ .build();
+ assertEquals(expectedSchema, internalSchema);
+ }
+}
diff --git a/xtable-service/pom.xml b/xtable-service/pom.xml
index 381aa3d0..ee4854d2 100644
--- a/xtable-service/pom.xml
+++ b/xtable-service/pom.xml
@@ -216,6 +216,16 @@
<scope>test</scope>
</dependency>
+ <!-- Paimon dependencies -->
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-bundle</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-spark-${spark.version.prefix}</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
diff --git
a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
index 0e7a7e26..c87faee7 100644
---
a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
+++
b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
@@ -22,6 +22,7 @@ import static org.apache.xtable.GenericTable.getTableName;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -91,12 +92,14 @@ public class ITConversionService {
Files.createDirectories(basePath);
SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
sparkSession =
SparkSession.builder().config(HoodieReadClient.addHoodieSupport(sparkConf)).getOrCreate();
sparkSession
.sparkContext()
.hadoopConfiguration()
.set("parquet.avro.write-old-list-structure", "false");
+
jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
} catch (IOException e) {
throw new RuntimeException(e);
@@ -116,14 +119,18 @@ public class ITConversionService {
new DeltaConversionSourceProvider();
ConversionSourceProvider<org.apache.iceberg.Snapshot>
icebergConversionSourceProvider =
new IcebergConversionSourceProvider();
+ ConversionSourceProvider<org.apache.paimon.Snapshot>
paimonConversionSourceProvider =
+ new org.apache.xtable.paimon.PaimonConversionSourceProvider();
hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
+ paimonConversionSourceProvider.init(jsc.hadoopConfiguration());
sourceProviders.put(HUDI, hudiConversionSourceProvider);
sourceProviders.put(DELTA, deltaConversionSourceProvider);
sourceProviders.put(ICEBERG, icebergConversionSourceProvider);
+ sourceProviders.put(PAIMON, paimonConversionSourceProvider);
this.conversionService =
new ConversionService(
@@ -232,7 +239,7 @@ public class ITConversionService {
private static Stream<Arguments>
generateTestParametersFormatsAndPartitioning() {
List<Arguments> arguments = new ArrayList<>();
- for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
+ for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG,
PAIMON)) {
for (boolean isPartitioned : new boolean[] {true, false}) {
arguments.add(Arguments.of(sourceTableFormat, isPartitioned));
}
@@ -243,6 +250,7 @@ public class ITConversionService {
protected static List<String> getOtherFormats(String sourceTableFormat) {
return Arrays.stream(TableFormat.values())
.filter(format -> !format.equals(sourceTableFormat))
+ .filter(format -> !format.equals(PAIMON)) // Paimon target not
supported yet
.collect(Collectors.toList());
}