This is an automated email from the ASF dual-hosted git repository.

yuanfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new f5247a275 [AMORO-3441] Support Apache Paimon 1.1.1 (#3581)
f5247a275 is described below

commit f5247a275d02d01a8b3e9da673cfa8ba6b6f8da0
Author: ConradJam <[email protected]>
AuthorDate: Tue Jun 3 17:44:32 2025 +0800

    [AMORO-3441] Support Apache Paimon 1.1.1 (#3581)
    
    * [AMORO-3441] Support Apache Paimon 1.1.1
    
    * [hotfix] paimon exclude format table object
    
    * paimon extension with spark sql
    
    * fix flink pom by diff flink version
    
    * hot fix
---
 .../amoro/server/terminal/SparkContextUtil.java      |  6 +++++-
 .../server/terminal/local/LocalSessionFactory.java   |  6 +++++-
 .../amoro-mixed-flink-common/pom.xml                 |  3 ++-
 .../apache/amoro/spark/test/SparkTestContext.java    |  1 +
 .../v3.3/amoro-mixed-spark-3.3/pom.xml               |  2 +-
 .../apache/amoro/formats/paimon/PaimonCatalog.java   | 14 ++++++++++++--
 .../amoro/formats/paimon/PaimonCatalogFactory.java   |  2 ++
 .../amoro/formats/paimon/PaimonTableDescriptor.java  | 20 ++++++++++----------
 pom.xml                                              |  6 +++---
 9 files changed, 41 insertions(+), 19 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
index b373c4625..4139414ee 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/SparkContextUtil.java
@@ -29,6 +29,8 @@ public class SparkContextUtil {
 
   public static final String ICEBERG_EXTENSION =
       "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
+  public static final String PAIMON_EXTENSION =
+      "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions";
   public static final String MIXED_FORMAT_EXTENSION =
       "org.apache.amoro.spark.MixedFormatSparkExtensions";
   public static final String ICEBERG_CATALOG = 
"org.apache.iceberg.spark.SparkCatalog";
@@ -46,7 +48,9 @@ public class SparkContextUtil {
 
   public static Map<String, String> getSparkConf(Configurations sessionConfig) 
{
     Map<String, String> sparkConf = Maps.newLinkedHashMap();
-    sparkConf.put("spark.sql.extensions", MIXED_FORMAT_EXTENSION + "," + 
ICEBERG_EXTENSION);
+    sparkConf.put(
+        "spark.sql.extensions",
+        MIXED_FORMAT_EXTENSION + "," + ICEBERG_EXTENSION + "," + 
PAIMON_EXTENSION);
 
     List<String> catalogs = 
sessionConfig.get(TerminalSessionFactory.SessionConfigOptions.CATALOGS);
     String catalogUrlBase =
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
index edbcd4515..c09e19d90 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/local/LocalSessionFactory.java
@@ -138,7 +138,11 @@ public class LocalSessionFactory implements 
TerminalSessionFactory {
       sparkconf.set("spark.network.timeout", "200s");
       sparkconf.set(
           "spark.sql.extensions",
-          SparkContextUtil.MIXED_FORMAT_EXTENSION + "," + 
SparkContextUtil.ICEBERG_EXTENSION);
+          SparkContextUtil.MIXED_FORMAT_EXTENSION
+              + ","
+              + SparkContextUtil.ICEBERG_EXTENSION
+              + ","
+              + SparkContextUtil.PAIMON_EXTENSION);
       sparkconf.set("spark.cleaner.referenceTracking", "false");
 
       for (String key : this.conf.keySet()) {
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
index ab3fe3036..c91867df4 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
@@ -88,8 +88,9 @@
 
         <dependency>
             <groupId>org.apache.paimon</groupId>
-            <artifactId>paimon-flink-common</artifactId>
+            <artifactId>paimon-flink-1.17</artifactId>
             <version>${paimon.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
index 351264ece..20bdfa004 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
@@ -49,6 +49,7 @@ public class SparkTestContext {
   public static final String MIXED_CATALOG_IMPL = 
"org.apache.amoro.spark.MixedFormatSparkCatalog";
   public static final String SQL_EXTENSIONS_IMPL =
       "org.apache.amoro.spark.MixedFormatSparkExtensions"
+          + ",org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
           + 
",org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
 
   public static final String UNIFIED_CATALOG_IMP = 
"org.apache.amoro.spark.SparkUnifiedCatalog";
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
index a1b3276fa..949c3494c 100644
--- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/pom.xml
@@ -35,7 +35,7 @@
 
     <properties>
         <hive.version>2.3.9</hive.version>
-        <spark.version>3.3.2</spark.version>
+        <spark.version>3.3.4</spark.version>
         <scala.version>2.12.15</scala.version>
         <scala.collection.compat>2.11.0</scala.collection.compat>
     </properties>
diff --git 
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
 
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
index b795f0332..2af18a941 100644
--- 
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
+++ 
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalog.java
@@ -48,12 +48,22 @@ public class PaimonCatalog implements FormatCatalog {
 
   @Override
   public boolean databaseExists(String database) {
-    return catalog.databaseExists(database);
+    try {
+      catalog.getDatabase(database);
+      return true;
+    } catch (Catalog.DatabaseNotExistException e) {
+      return false;
+    }
   }
 
   @Override
   public boolean tableExists(String database, String table) {
-    return catalog.tableExists(Identifier.create(database, table));
+    try {
+      catalog.getTable(Identifier.create(database, table));
+      return true;
+    } catch (Catalog.TableNotExistException e) {
+      return false;
+    }
   }
 
   @Override
diff --git 
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
 
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
index b61486376..4787628ac 100644
--- 
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
+++ 
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonCatalogFactory.java
@@ -47,6 +47,8 @@ public class PaimonCatalogFactory implements 
FormatCatalogFactory {
       String name, String metastoreType, Map<String, String> properties, 
TableMetaStore metaStore) {
     Optional<URL> hiveSiteLocation = metaStore.getHiveSiteLocation();
     Map<String, String> catalogProperties = Maps.newHashMap();
+    // if format table enabled, paimon will load hive orc/parquet/csv table to 
paimon table
+    catalogProperties.put(CatalogOptions.FORMAT_TABLE_ENABLED.key(), "false");
     catalogProperties.putAll(properties);
 
     hiveSiteLocation.ifPresent(
diff --git 
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
 
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
index b21766219..534501af3 100644
--- 
a/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
+++ 
b/amoro-format-paimon/src/main/java/org/apache/amoro/formats/paimon/PaimonTableDescriptor.java
@@ -190,7 +190,8 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
         throw new RuntimeException(e);
       }
     } else {
-      snapshots = 
Collections.singleton(table.tagManager().taggedSnapshot(ref)).iterator();
+      snapshots =
+          
Collections.singleton(table.tagManager().getOrThrow(ref).trimToSnapshot()).iterator();
     }
 
     FileStore<?> store = table.store();
@@ -233,7 +234,7 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
     if (BranchManager.isMainBranch(ref) || 
table.branchManager().branchExists(ref)) {
       snapshot = 
table.snapshotManager().copyWithBranch(ref).snapshot(commitId);
     } else {
-      snapshot = table.tagManager().tag(ref);
+      snapshot = table.tagManager().getOrThrow(ref).trimToSnapshot();
     }
 
     FileStore<?> store = table.store();
@@ -243,7 +244,6 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
 
     List<ManifestFileMeta> manifestFileMetas = 
manifestList.readDeltaManifests(snapshot);
     for (ManifestFileMeta manifestFileMeta : manifestFileMetas) {
-      manifestFileMeta.fileSize();
       List<ManifestEntry> manifestEntries = 
manifestFile.read(manifestFileMeta.fileName());
       for (ManifestEntry entry : manifestEntries) {
         amsDataFileInfos.add(
@@ -275,12 +275,12 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
     FileStore<?> store = table.store();
     FileStorePathFactory fileStorePathFactory = store.pathFactory();
     List<ManifestEntry> files = store.newScan().plan().files(FileKind.ADD);
-    Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupByPartFiles = 
groupByPartFiles(files);
+    Map<BinaryRow, Map<Integer, List<ManifestEntry>>> groupByPartFiles = 
groupByPartFiles(files);
 
     List<PartitionBaseInfo> partitionBaseInfoList = new ArrayList<>();
-    for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> 
groupByPartitionEntry :
+    for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> 
groupByPartitionEntry :
         groupByPartFiles.entrySet()) {
-      for (Map.Entry<Integer, List<DataFileMeta>> groupByBucketEntry :
+      for (Map.Entry<Integer, List<ManifestEntry>> groupByBucketEntry :
           groupByPartitionEntry.getValue().entrySet()) {
         String partitionSt =
             partitionString(
@@ -288,10 +288,10 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
         int fileCount = 0;
         long fileSize = 0;
         long lastCommitTime = 0;
-        for (DataFileMeta dataFileMeta : groupByBucketEntry.getValue()) {
+        for (ManifestEntry manifestEntry : groupByBucketEntry.getValue()) {
           fileCount++;
-          fileSize += dataFileMeta.fileSize();
-          lastCommitTime = Math.max(lastCommitTime, 
dataFileMeta.creationTimeEpochMillis());
+          fileSize += manifestEntry.file().fileSize();
+          lastCommitTime = Math.max(lastCommitTime, 
manifestEntry.file().creationTimeEpochMillis());
         }
         partitionBaseInfoList.add(
             new PartitionBaseInfo(partitionSt, 0, fileCount, fileSize, 
lastCommitTime));
@@ -621,7 +621,7 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
     return store
         .pathFactory()
         .createDataFilePathFactory(manifestEntry.partition(), 
manifestEntry.bucket())
-        .toPath(manifestEntry.file().fileName())
+        .toPath(manifestEntry.file())
         .toString();
   }
 
diff --git a/pom.xml b/pom.xml
index f98479380..f8e83f1f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
         
<maven-checkstyle-plugin.version>3.3.1</maven-checkstyle-plugin.version>
 
         <iceberg.version>1.6.1</iceberg.version>
-        <paimon.version>0.9.0</paimon.version>
+        <paimon.version>1.1.1</paimon.version>
         <hive.version>3.1.3</hive.version>
         <hadoop.version>3.4.0</hadoop.version>
         <kerby.version>2.0.3</kerby.version>
@@ -124,7 +124,7 @@
         <orc-core.version>1.8.3</orc-core.version>
         <awssdk.version>2.24.12</awssdk.version>
         <aliyun-sdk-oss.version>3.10.2</aliyun-sdk-oss.version>
-        <terminal.spark.version>3.3.2</terminal.spark.version>
+        <terminal.spark.version>3.3.4</terminal.spark.version>
         <terminal.spark.major.version>3.3</terminal.spark.major.version>
         <dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
         <caffeine.version>2.9.3</caffeine.version>
@@ -1380,7 +1380,7 @@
                 <!-- Do not use Hive 2.3.9 for Hive 4 Metastore support due to 
incompatible API changes. -->
                 <hive.version>2.3.8</hive.version>
                 <hadoop.version>2.10.2</hadoop.version>
-                <terminal.spark.version>3.3.3</terminal.spark.version>
+                <terminal.spark.version>3.3.4</terminal.spark.version>
                 
<terminal.spark.major.version>3.3</terminal.spark.major.version>
             </properties>
         </profile>

Reply via email to