This is an automated email from the ASF dual-hosted git repository.
yuanfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new f5247a275 [AMORO-3441] Support Apache Paimon 1.1.1 (#3581)
f5247a275 is described below
commit f5247a275d02d01a8b3e9da673cfa8ba6b6f8da0
Author: ConradJam <[email protected]>
AuthorDate: Tue Jun 3 17:44:32 2025 +0800
[AMORO-3441] Support Apache Paimon 1.1.1 (#3581)
* [AMORO-3441] Support Apache Paimon 1.1.1
* [hotfix] paimon exclude format table object
* paimon extension with spark sql
* fix flink pom by diff flink version
* hot fix
---
.../amoro/server/terminal/SparkContextUtil.java | 6 +++++-
.../server/terminal/local/LocalSessionFactory.java | 6 +++++-
.../amoro-mixed-flink-common/pom.xml | 3 ++-
.../apache/amoro/spark/test/SparkTestContext.java | 1 +
.../v3.3/amoro-mixed-spark-3.3/pom.xml | 2 +-
.../apache/amoro/formats/paimon/PaimonCatalog.java | 14 ++++++++++++--
.../amoro/formats/paimon/PaimonCatalogFactory.java | 2 ++
.../amoro/formats/paimon/PaimonTableDescriptor.java | 20 ++++++++++----------
pom.xml | 6 +++---
9 files changed, 41 insertions(+), 19 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
index b373c4625..4139414ee 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
@@ -29,6 +29,8 @@ public class SparkContextUtil {
public static final String ICEBERG_EXTENSION =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
+ public static final String PAIMON_EXTENSION =
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions";
public static final String MIXED_FORMAT_EXTENSION =
"org.apache.amoro.spark.MixedFormatSparkExtensions";
public static final String ICEBERG_CATALOG =
"org.apache.iceberg.spark.SparkCatalog";
@@ -46,7 +48,9 @@ public class SparkContextUtil {
public static Map<String, String> getSparkConf(Configurations sessionConfig)
{
Map<String, String> sparkConf = Maps.newLinkedHashMap();
- sparkConf.put("spark.sql.extensions", MIXED_FORMAT_EXTENSION + "," +
ICEBERG_EXTENSION);
+ sparkConf.put(
+ "spark.sql.extensions",
+ MIXED_FORMAT_EXTENSION + "," + ICEBERG_EXTENSION + "," +
PAIMON_EXTENSION);
List<String> catalogs =
sessionConfig.get(TerminalSessionFactory.SessionConfigOptions.CATALOGS);
String catalogUrlBase =
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
index edbcd4515..c09e19d90 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
@@ -138,7 +138,11 @@ public class LocalSessionFactory implements
TerminalSessionFactory {
sparkconf.set("spark.network.timeout", "200s");
sparkconf.set(
"spark.sql.extensions",
- SparkContextUtil.MIXED_FORMAT_EXTENSION + "," +
SparkContextUtil.ICEBERG_EXTENSION);
+ SparkContextUtil.MIXED_FORMAT_EXTENSION
+ + ","
+ + SparkContextUtil.ICEBERG_EXTENSION
+ + ","
+ + SparkContextUtil.PAIMON_EXTENSION);
sparkconf.set("spark.cleaner.referenceTracking", "false");
for (String key : this.conf.keySet()) {
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
index ab3fe3036..c91867df4 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
@@ -88,8 +88,9 @@
<dependency>
<groupId>org.apache.paimon</groupId>
- <artifactId>paimon-flink-common</artifactId>
+ <artifactId>paimon-flink-1.17</artifactId>
<version>${paimon.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
index 351264ece..20bdfa004 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
@@ -49,6 +49,7 @@ public class SparkTestContext {
public static final String MIXED_CATALOG_IMPL =
"org.apache.amoro.spark.MixedFormatSparkCatalog";
public static final String SQL_EXTENSIONS_IMPL =
"org.apache.amoro.spark.MixedFormatSparkExtensions"
+ + ",org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
+
",org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
public static final String UNIFIED_CATALOG_IMP =
"org.apache.amoro.spark.SparkUnifiedCatalog";
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
index a1b3276fa..949c3494c 100644
--- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
@@ -35,7 +35,7 @@
<properties>
<hive.version>2.3.9</hive.version>
- <spark.version>3.3.2</spark.version>
+ <spark.version>3.3.4</spark.version>
<scala.version>2.12.15</scala.version>
<scala.collection.compat>2.11.0</scala.collection.compat>
</properties>
diff --git
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
index b795f0332..2af18a941 100644
---
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
+++
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
@@ -48,12 +48,22 @@ public class PaimonCatalog implements FormatCatalog {
@Override
public boolean databaseExists(String database) {
- return catalog.databaseExists(database);
+ try {
+ catalog.getDatabase(database);
+ return true;
+ } catch (Catalog.DatabaseNotExistException e) {
+ return false;
+ }
}
@Override
public boolean tableExists(String database, String table) {
- return catalog.tableExists(Identifier.create(database, table));
+ try {
+ catalog.getTable(Identifier.create(database, table));
+ return true;
+ } catch (Catalog.TableNotExistException e) {
+ return false;
+ }
}
@Override
diff --git
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
index b61486376..4787628ac 100644
---
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
+++
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
@@ -47,6 +47,8 @@ public class PaimonCatalogFactory implements
FormatCatalogFactory {
String name, String metastoreType, Map<String, String> properties,
TableMetaStore metaStore) {
Optional<URL> hiveSiteLocation = metaStore.getHiveSiteLocation();
Map<String, String> catalogProperties = Maps.newHashMap();
+ // if format table enabled, paimon will load hive orc/parquet/csv table to
paimon table
+ catalogProperties.put(CatalogOptions.FORMAT_TABLE_ENABLED.key(), "false");
catalogProperties.putAll(properties);
hiveSiteLocation.ifPresent(
diff --git
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
index b21766219..534501af3 100644
---
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
+++
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
@@ -190,7 +190,8 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
throw new RuntimeException(e);
}
} else {
- snapshots =
Collections.singleton(table.tagManager().taggedSnapshot(ref)).iterator();
+ snapshots =
+
Collections.singleton(table.tagManager().getOrThrow(ref).trimToSnapshot()).iterator();
}
FileStore<?> store = table.store();
@@ -233,7 +234,7 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
if (BranchManager.isMainBranch(ref) ||
table.branchManager().branchExists(ref)) {
snapshot =
table.snapshotManager().copyWithBranch(ref).snapshot(commitId);
} else {
- snapshot = table.tagManager().tag(ref);
+ snapshot = table.tagManager().getOrThrow(ref).trimToSnapshot();
}
FileStore<?> store = table.store();
@@ -243,7 +244,6 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
List<ManifestFileMeta> manifestFileMetas =
manifestList.readDeltaManifests(snapshot);
for (ManifestFileMeta manifestFileMeta : manifestFileMetas) {
- manifestFileMeta.fileSize();
List<ManifestEntry> manifestEntries =
manifestFile.read(manifestFileMeta.fileName());
for (ManifestEntry entry : manifestEntries) {
amsDataFileInfos.add(
@@ -275,12 +275,12 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
FileStore<?> store = table.store();
FileStorePathFactory fileStorePathFactory = store.pathFactory();
List<ManifestEntry> files = store.newScan().plan().files(FileKind.ADD);
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupByPartFiles =
groupByPartFiles(files);
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> groupByPartFiles =
groupByPartFiles(files);
List<PartitionBaseInfo> partitionBaseInfoList = new ArrayList<>();
- for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>>
groupByPartitionEntry :
+ for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>>
groupByPartitionEntry :
groupByPartFiles.entrySet()) {
- for (Map.Entry<Integer, List<DataFileMeta>> groupByBucketEntry :
+ for (Map.Entry<Integer, List<ManifestEntry>> groupByBucketEntry :
groupByPartitionEntry.getValue().entrySet()) {
String partitionSt =
partitionString(
@@ -288,10 +288,10 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
int fileCount = 0;
long fileSize = 0;
long lastCommitTime = 0;
- for (DataFileMeta dataFileMeta : groupByBucketEntry.getValue()) {
+ for (ManifestEntry manifestEntry : groupByBucketEntry.getValue()) {
fileCount++;
- fileSize += dataFileMeta.fileSize();
- lastCommitTime = Math.max(lastCommitTime,
dataFileMeta.creationTimeEpochMillis());
+ fileSize += manifestEntry.file().fileSize();
+ lastCommitTime = Math.max(lastCommitTime,
manifestEntry.file().creationTimeEpochMillis());
}
partitionBaseInfoList.add(
new PartitionBaseInfo(partitionSt, 0, fileCount, fileSize,
lastCommitTime));
@@ -621,7 +621,7 @@ public class PaimonTableDescriptor implements
FormatTableDescriptor {
return store
.pathFactory()
.createDataFilePathFactory(manifestEntry.partition(),
manifestEntry.bucket())
- .toPath(manifestEntry.file().fileName())
+ .toPath(manifestEntry.file())
.toString();
}
diff --git a/pom.xml b/pom.xml
index f98479380..f8e83f1f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
<maven-checkstyle-plugin.version>3.3.1</maven-checkstyle-plugin.version>
<iceberg.version>1.6.1</iceberg.version>
- <paimon.version>0.9.0</paimon.version>
+ <paimon.version>1.1.1</paimon.version>
<hive.version>3.1.3</hive.version>
<hadoop.version>3.4.0</hadoop.version>
<kerby.version>2.0.3</kerby.version>
@@ -124,7 +124,7 @@
<orc-core.version>1.8.3</orc-core.version>
<awssdk.version>2.24.12</awssdk.version>
<aliyun-sdk-oss.version>3.10.2</aliyun-sdk-oss.version>
- <terminal.spark.version>3.3.2</terminal.spark.version>
+ <terminal.spark.version>3.3.4</terminal.spark.version>
<terminal.spark.major.version>3.3</terminal.spark.major.version>
<dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
<caffeine.version>2.9.3</caffeine.version>
@@ -1380,7 +1380,7 @@
<!-- Do not use Hive 2.3.9 for Hive 4 Metastore support due to
incompatible API changes. -->
<hive.version>2.3.8</hive.version>
<hadoop.version>2.10.2</hadoop.version>
- <terminal.spark.version>3.3.3</terminal.spark.version>
+ <terminal.spark.version>3.3.4</terminal.spark.version>
<terminal.spark.major.version>3.3</terminal.spark.major.version>
</properties>
</profile>