This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c86a9bf1 [hotfix] Minor refactor SparkCatalog (#6188)
73c86a9bf1 is described below

commit 73c86a9bf1b5cfcb9318b615f56324cdf5a3e3ce
Author: Zouxxyy <zouxinyu....@alibaba-inc.com>
AuthorDate: Wed Sep 3 16:59:46 2025 +0800

    [hotfix] Minor refactor SparkCatalog (#6188)
---
 .../java/org/apache/paimon/catalog/Catalog.java    |   4 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java | 223 +++++++++++----------
 .../apache/paimon/spark/SparkGenericCatalog.java   |   6 +
 3 files changed, 123 insertions(+), 110 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 3109d7bc2d..702dc155a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -613,7 +613,7 @@ public interface Catalog extends AutoCloseable {
      * will throw an {@link UnsupportedOperationException}, affect the 
following methods:
      *
      * <ul>
-     *   <li>{@link #commitSnapshot(Identifier, Snapshot, List)}.
+     *   <li>{@link #commitSnapshot(Identifier, String, Snapshot, List)}.
      *   <li>{@link #loadSnapshot(Identifier)}.
      *   <li>{@link #rollbackTo(Identifier, Instant)}.
      *   <li>{@link #createBranch(Identifier, String, String)}.
@@ -787,6 +787,8 @@ public interface Catalog extends AutoCloseable {
     void alterPartitions(Identifier identifier, List<PartitionStatistics> 
partitions)
             throws TableNotExistException;
 
+    // ======================= Function methods ===============================
+
     /**
      * Get the names of all functions in this catalog.
      *
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index eee3991ad3..ac0367d43a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -96,6 +96,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
+import static org.apache.paimon.spark.SparkCatalogOptions.V1FUNCTION_ENABLED;
 import static 
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY;
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static 
org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations;
@@ -137,9 +138,7 @@ public class SparkCatalog extends SparkBaseCatalog
         this.defaultDatabase =
                 options.getOrDefault(DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE.defaultValue());
         this.v1FunctionEnabled =
-                options.getBoolean(
-                                SparkCatalogOptions.V1FUNCTION_ENABLED.key(),
-                                
SparkCatalogOptions.V1FUNCTION_ENABLED.defaultValue())
+                options.getBoolean(V1FUNCTION_ENABLED.key(), 
V1FUNCTION_ENABLED.defaultValue())
                         && DelegateCatalog.rootCatalog(catalog) instanceof 
RESTCatalog;
         if (v1FunctionEnabled) {
             this.v1FunctionRegistry = new 
PaimonV1FunctionRegistry(sparkSession);
@@ -147,7 +146,7 @@ public class SparkCatalog extends SparkBaseCatalog
         try {
             catalog.getDatabase(defaultDatabase);
         } catch (Catalog.DatabaseNotExistException e) {
-            LOG.warn(
+            LOG.info(
                     "Default database '{}' does not exist, caused by: {}, 
start to create it",
                     defaultDatabase,
                     ExceptionUtils.stringifyException(e));
@@ -163,6 +162,8 @@ public class SparkCatalog extends SparkBaseCatalog
         return catalog;
     }
 
+    // ======================= database methods ===============================
+
     @Override
     public String[] defaultNamespace() {
         return new String[] {defaultDatabase};
@@ -259,6 +260,22 @@ public class SparkCatalog extends SparkBaseCatalog
         }
     }
 
+    @Override
+    public void alterNamespace(String[] namespace, NamespaceChange... changes)
+            throws NoSuchNamespaceException {
+        checkNamespace(namespace);
+        try {
+            String databaseName = getDatabaseNameFromNamespace(namespace);
+            List<PropertyChange> propertyChanges =
+                    
Arrays.stream(changes).map(this::toPropertyChange).collect(Collectors.toList());
+            catalog.alterDatabase(databaseName, propertyChanges, false);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(namespace);
+        }
+    }
+
+    // ======================= table methods ===============================
+
     @Override
     public Identifier[] listTables(String[] namespace) throws 
NoSuchNamespaceException {
         checkNamespace(namespace);
@@ -499,111 +516,7 @@ public class SparkCatalog extends SparkBaseCatalog
         }
     }
 
-    // --------------------- tools ------------------------------------------
-
-    protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
-            Identifier ident, Map<String, String> extraOptions) throws 
NoSuchTableException {
-        try {
-            org.apache.paimon.table.Table paimonTable = 
catalog.getTable(toIdentifier(ident));
-            if (paimonTable instanceof FormatTable) {
-                return convertToFileTable(ident, (FormatTable) paimonTable);
-            } else {
-                return new SparkTable(
-                        copyWithSQLConf(
-                                paimonTable, catalogName, toIdentifier(ident), 
extraOptions));
-            }
-        } catch (Catalog.TableNotExistException e) {
-            throw new NoSuchTableException(ident);
-        }
-    }
-
-    private static FileTable convertToFileTable(Identifier ident, FormatTable 
formatTable) {
-        SparkSession spark = PaimonSparkSession$.MODULE$.active();
-        StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
-        StructType partitionSchema =
-                SparkTypeUtils.fromPaimonRowType(
-                        TypeUtils.project(formatTable.rowType(), 
formatTable.partitionKeys()));
-        List<String> pathList = new ArrayList<>();
-        pathList.add(formatTable.location());
-        Options options = Options.fromMap(formatTable.options());
-        CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(options.toMap());
-        if (formatTable.format() == FormatTable.Format.CSV) {
-            options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
-            dsOptions = new CaseInsensitiveStringMap(options.toMap());
-            return new PartitionedCSVTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    CSVFileFormat.class,
-                    partitionSchema);
-        } else if (formatTable.format() == FormatTable.Format.ORC) {
-            return new PartitionedOrcTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    OrcFileFormat.class,
-                    partitionSchema);
-        } else if (formatTable.format() == FormatTable.Format.PARQUET) {
-            return new PartitionedParquetTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    ParquetFileFormat.class,
-                    partitionSchema);
-        } else if (formatTable.format() == FormatTable.Format.JSON) {
-            return new PartitionedJsonTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    JsonFileFormat.class,
-                    partitionSchema);
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unsupported format table "
-                            + ident.name()
-                            + " format "
-                            + formatTable.format().name());
-        }
-    }
-
-    protected List<String> convertPartitionTransforms(Transform[] transforms) {
-        List<String> partitionColNames = new ArrayList<>(transforms.length);
-        for (Transform transform : transforms) {
-            if (!(transform instanceof IdentityTransform)) {
-                throw new UnsupportedOperationException(
-                        "Unsupported partition transform: " + transform);
-            }
-            NamedReference ref = ((IdentityTransform) transform).ref();
-            if (!(ref instanceof FieldReference || ref.fieldNames().length != 
1)) {
-                throw new UnsupportedOperationException(
-                        "Unsupported partition transform: " + transform);
-            }
-            partitionColNames.add(ref.fieldNames()[0]);
-        }
-        return partitionColNames;
-    }
-
-    @Override
-    public void alterNamespace(String[] namespace, NamespaceChange... changes)
-            throws NoSuchNamespaceException {
-        checkNamespace(namespace);
-        try {
-            String databaseName = getDatabaseNameFromNamespace(namespace);
-            List<PropertyChange> propertyChanges =
-                    
Arrays.stream(changes).map(this::toPropertyChange).collect(Collectors.toList());
-            catalog.alterDatabase(databaseName, propertyChanges, false);
-        } catch (Catalog.DatabaseNotExistException e) {
-            throw new NoSuchNamespaceException(namespace);
-        }
-    }
+    // ======================= Function methods ===============================
 
     @Override
     public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
@@ -716,6 +629,98 @@ public class SparkCatalog extends SparkBaseCatalog
                 
.dropFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent), ifExists);
     }
 
+    // ======================= Tools methods ===============================
+
+    protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
+            Identifier ident, Map<String, String> extraOptions) throws 
NoSuchTableException {
+        try {
+            org.apache.paimon.table.Table paimonTable = 
catalog.getTable(toIdentifier(ident));
+            if (paimonTable instanceof FormatTable) {
+                return convertToFileTable(ident, (FormatTable) paimonTable);
+            } else {
+                return new SparkTable(
+                        copyWithSQLConf(
+                                paimonTable, catalogName, toIdentifier(ident), 
extraOptions));
+            }
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(ident);
+        }
+    }
+
+    private static FileTable convertToFileTable(Identifier ident, FormatTable 
formatTable) {
+        SparkSession spark = PaimonSparkSession$.MODULE$.active();
+        StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
+        StructType partitionSchema =
+                SparkTypeUtils.fromPaimonRowType(
+                        TypeUtils.project(formatTable.rowType(), 
formatTable.partitionKeys()));
+        List<String> pathList = new ArrayList<>();
+        pathList.add(formatTable.location());
+        Options options = Options.fromMap(formatTable.options());
+        CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(options.toMap());
+        if (formatTable.format() == FormatTable.Format.CSV) {
+            options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
+            dsOptions = new CaseInsensitiveStringMap(options.toMap());
+            return new PartitionedCSVTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    CSVFileFormat.class,
+                    partitionSchema);
+        } else if (formatTable.format() == FormatTable.Format.ORC) {
+            return new PartitionedOrcTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    OrcFileFormat.class,
+                    partitionSchema);
+        } else if (formatTable.format() == FormatTable.Format.PARQUET) {
+            return new PartitionedParquetTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    ParquetFileFormat.class,
+                    partitionSchema);
+        } else if (formatTable.format() == FormatTable.Format.JSON) {
+            return new PartitionedJsonTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    JsonFileFormat.class,
+                    partitionSchema);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported format table "
+                            + ident.name()
+                            + " format "
+                            + formatTable.format().name());
+        }
+    }
+
+    protected List<String> convertPartitionTransforms(Transform[] transforms) {
+        List<String> partitionColNames = new ArrayList<>(transforms.length);
+        for (Transform transform : transforms) {
+            if (!(transform instanceof IdentityTransform)) {
+                throw new UnsupportedOperationException(
+                        "Unsupported partition transform: " + transform);
+            }
+            NamedReference ref = ((IdentityTransform) transform).ref();
+            if (!(ref instanceof FieldReference || ref.fieldNames().length != 
1)) {
+                throw new UnsupportedOperationException(
+                        "Unsupported partition transform: " + transform);
+            }
+            partitionColNames.add(ref.fieldNames()[0]);
+        }
+        return partitionColNames;
+    }
+
     private PropertyChange toPropertyChange(NamespaceChange change) {
         if (change instanceof NamespaceChange.SetProperty) {
             NamespaceChange.SetProperty set = (NamespaceChange.SetProperty) 
change;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index e4563c492f..098b73a50b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -93,6 +93,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         return this.sparkCatalog.paimonCatalog();
     }
 
+    // ======================= database methods ===============================
+
     @Override
     public String[] defaultNamespace() {
         return asNamespaceCatalog().defaultNamespace();
@@ -149,6 +151,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         return asNamespaceCatalog().dropNamespace(namespace, cascade);
     }
 
+    // ======================= table methods ===============================
+
     @Override
     public Identifier[] listTables(String[] namespace) throws 
NoSuchNamespaceException {
         // delegate to the session catalog because all tables share the same 
namespace
@@ -354,6 +358,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         return (FunctionCatalog) getDelegateCatalog();
     }
 
+    // ======================= Function methods ===============================
+
     @Override
     public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
         try {

Reply via email to