This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 73c86a9bf1 [hotfix] Minor refactor SparkCatalog (#6188) 73c86a9bf1 is described below commit 73c86a9bf1b5cfcb9318b615f56324cdf5a3e3ce Author: Zouxxyy <zouxinyu....@alibaba-inc.com> AuthorDate: Wed Sep 3 16:59:46 2025 +0800 [hotfix] Minor refactor SparkCatalog (#6188) --- .../java/org/apache/paimon/catalog/Catalog.java | 4 +- .../java/org/apache/paimon/spark/SparkCatalog.java | 223 +++++++++++---------- .../apache/paimon/spark/SparkGenericCatalog.java | 6 + 3 files changed, 123 insertions(+), 110 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 3109d7bc2d..702dc155a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -613,7 +613,7 @@ public interface Catalog extends AutoCloseable { * will throw an {@link UnsupportedOperationException}, affect the following methods: * * <ul> - * <li>{@link #commitSnapshot(Identifier, Snapshot, List)}. + * <li>{@link #commitSnapshot(Identifier, String, Snapshot, List)}. * <li>{@link #loadSnapshot(Identifier)}. * <li>{@link #rollbackTo(Identifier, Instant)}. * <li>{@link #createBranch(Identifier, String, String)}. @@ -787,6 +787,8 @@ public interface Catalog extends AutoCloseable { void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions) throws TableNotExistException; + // ======================= Function methods =============================== + /** * Get the names of all functions in this catalog. * diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index eee3991ad3..ac0367d43a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -96,6 +96,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; +import static org.apache.paimon.spark.SparkCatalogOptions.V1FUNCTION_ENABLED; import static org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY; import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; import static org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations; @@ -137,9 +138,7 @@ public class SparkCatalog extends SparkBaseCatalog this.defaultDatabase = options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue()); this.v1FunctionEnabled = - options.getBoolean( - SparkCatalogOptions.V1FUNCTION_ENABLED.key(), - SparkCatalogOptions.V1FUNCTION_ENABLED.defaultValue()) + options.getBoolean(V1FUNCTION_ENABLED.key(), V1FUNCTION_ENABLED.defaultValue()) && DelegateCatalog.rootCatalog(catalog) instanceof RESTCatalog; if (v1FunctionEnabled) { this.v1FunctionRegistry = new PaimonV1FunctionRegistry(sparkSession); @@ -147,7 +146,7 @@ public class SparkCatalog extends SparkBaseCatalog try { catalog.getDatabase(defaultDatabase); } catch (Catalog.DatabaseNotExistException e) { - LOG.warn( + LOG.info( "Default database '{}' does not exist, caused by: {}, start to create it", defaultDatabase, ExceptionUtils.stringifyException(e)); @@ -163,6 +162,8 @@ public class SparkCatalog extends SparkBaseCatalog return catalog; } + // ======================= database methods =============================== + @Override public String[] defaultNamespace() { return new String[] {defaultDatabase}; @@ -259,6 +260,22 @@ public class SparkCatalog extends SparkBaseCatalog } } + @Override + public void alterNamespace(String[] namespace, NamespaceChange... changes) + throws NoSuchNamespaceException { + checkNamespace(namespace); + try { + String databaseName = getDatabaseNameFromNamespace(namespace); + List<PropertyChange> propertyChanges = + Arrays.stream(changes).map(this::toPropertyChange).collect(Collectors.toList()); + catalog.alterDatabase(databaseName, propertyChanges, false); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchNamespaceException(namespace); + } + } + + // ======================= table methods =============================== + @Override public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { checkNamespace(namespace); @@ -499,111 +516,7 @@ public class SparkCatalog extends SparkBaseCatalog } } - // --------------------- tools ------------------------------------------ - - protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( - Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException { - try { - org.apache.paimon.table.Table paimonTable = catalog.getTable(toIdentifier(ident)); - if (paimonTable instanceof FormatTable) { - return convertToFileTable(ident, (FormatTable) paimonTable); - } else { - return new SparkTable( - copyWithSQLConf( - paimonTable, catalogName, toIdentifier(ident), extraOptions)); - } - } catch (Catalog.TableNotExistException e) { - throw new NoSuchTableException(ident); - } - } - - private static FileTable convertToFileTable(Identifier ident, FormatTable formatTable) { - SparkSession spark = PaimonSparkSession$.MODULE$.active(); - StructType schema = SparkTypeUtils.fromPaimonRowType(formatTable.rowType()); - StructType partitionSchema = - SparkTypeUtils.fromPaimonRowType( - TypeUtils.project(formatTable.rowType(), formatTable.partitionKeys())); - List<String> pathList = new ArrayList<>(); - pathList.add(formatTable.location()); - Options options = Options.fromMap(formatTable.options()); - CaseInsensitiveStringMap dsOptions = new CaseInsensitiveStringMap(options.toMap()); - if (formatTable.format() == FormatTable.Format.CSV) { - options.set("sep", options.get(CsvOptions.FIELD_DELIMITER)); - dsOptions = new CaseInsensitiveStringMap(options.toMap()); - return new PartitionedCSVTable( - ident.name(), - spark, - dsOptions, - scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), - scala.Option.apply(schema), - CSVFileFormat.class, - partitionSchema); - } else if (formatTable.format() == FormatTable.Format.ORC) { - return new PartitionedOrcTable( - ident.name(), - spark, - dsOptions, - scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), - scala.Option.apply(schema), - OrcFileFormat.class, - partitionSchema); - } else if (formatTable.format() == FormatTable.Format.PARQUET) { - return new PartitionedParquetTable( - ident.name(), - spark, - dsOptions, - scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), - scala.Option.apply(schema), - ParquetFileFormat.class, - partitionSchema); - } else if (formatTable.format() == FormatTable.Format.JSON) { - return new PartitionedJsonTable( - ident.name(), - spark, - dsOptions, - scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), - scala.Option.apply(schema), - JsonFileFormat.class, - partitionSchema); - } else { - throw new UnsupportedOperationException( - "Unsupported format table " - + ident.name() - + " format " - + formatTable.format().name()); - } - } - - protected List<String> convertPartitionTransforms(Transform[] transforms) { - List<String> partitionColNames = new ArrayList<>(transforms.length); - for (Transform transform : transforms) { - if (!(transform instanceof IdentityTransform)) { - throw new UnsupportedOperationException( - "Unsupported partition transform: " + transform); - } - NamedReference ref = ((IdentityTransform) transform).ref(); - if (!(ref instanceof FieldReference || ref.fieldNames().length != 1)) { - throw new UnsupportedOperationException( - "Unsupported partition transform: " + transform); - } - partitionColNames.add(ref.fieldNames()[0]); - } - return partitionColNames; - } - - @Override - public void alterNamespace(String[] namespace, NamespaceChange... changes) - throws NoSuchNamespaceException { - checkNamespace(namespace); - try { - String databaseName = getDatabaseNameFromNamespace(namespace); - List<PropertyChange> propertyChanges = - Arrays.stream(changes).map(this::toPropertyChange).collect(Collectors.toList()); - catalog.alterDatabase(databaseName, propertyChanges, false); - } catch (Catalog.DatabaseNotExistException e) { - throw new NoSuchNamespaceException(namespace); - } - } + // ======================= Function methods =============================== @Override public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { @@ -716,6 +629,98 @@ public class SparkCatalog extends SparkBaseCatalog .dropFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent), ifExists); } + // ======================= Tools methods =============================== + + protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( + Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException { + try { + org.apache.paimon.table.Table paimonTable = catalog.getTable(toIdentifier(ident)); + if (paimonTable instanceof FormatTable) { + return convertToFileTable(ident, (FormatTable) paimonTable); + } else { + return new SparkTable( + copyWithSQLConf( + paimonTable, catalogName, toIdentifier(ident), extraOptions)); + } + } catch (Catalog.TableNotExistException e) { + throw new NoSuchTableException(ident); + } + } + + private static FileTable convertToFileTable(Identifier ident, FormatTable formatTable) { + SparkSession spark = PaimonSparkSession$.MODULE$.active(); + StructType schema = SparkTypeUtils.fromPaimonRowType(formatTable.rowType()); + StructType partitionSchema = + SparkTypeUtils.fromPaimonRowType( + TypeUtils.project(formatTable.rowType(), formatTable.partitionKeys())); + List<String> pathList = new ArrayList<>(); + pathList.add(formatTable.location()); + Options options = Options.fromMap(formatTable.options()); + CaseInsensitiveStringMap dsOptions = new CaseInsensitiveStringMap(options.toMap()); + if (formatTable.format() == FormatTable.Format.CSV) { + options.set("sep", options.get(CsvOptions.FIELD_DELIMITER)); + dsOptions = new CaseInsensitiveStringMap(options.toMap()); + return new PartitionedCSVTable( + ident.name(), + spark, + dsOptions, + scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), + scala.Option.apply(schema), + CSVFileFormat.class, + partitionSchema); + } else if (formatTable.format() == FormatTable.Format.ORC) { + return new PartitionedOrcTable( + ident.name(), + spark, + dsOptions, + scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), + scala.Option.apply(schema), + OrcFileFormat.class, + partitionSchema); + } else if (formatTable.format() == FormatTable.Format.PARQUET) { + return new PartitionedParquetTable( + ident.name(), + spark, + dsOptions, + scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), + scala.Option.apply(schema), + ParquetFileFormat.class, + partitionSchema); + } else if (formatTable.format() == FormatTable.Format.JSON) { + return new PartitionedJsonTable( + ident.name(), + spark, + dsOptions, + scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), + scala.Option.apply(schema), + JsonFileFormat.class, + partitionSchema); + } else { + throw new UnsupportedOperationException( + "Unsupported format table " + + ident.name() + + " format " + + formatTable.format().name()); + } + } + + protected List<String> convertPartitionTransforms(Transform[] transforms) { + List<String> partitionColNames = new ArrayList<>(transforms.length); + for (Transform transform : transforms) { + if (!(transform instanceof IdentityTransform)) { + throw new UnsupportedOperationException( + "Unsupported partition transform: " + transform); + } + NamedReference ref = ((IdentityTransform) transform).ref(); + if (!(ref instanceof FieldReference || ref.fieldNames().length != 1)) { + throw new UnsupportedOperationException( + "Unsupported partition transform: " + transform); + } + partitionColNames.add(ref.fieldNames()[0]); + } + return partitionColNames; + } + private PropertyChange toPropertyChange(NamespaceChange change) { if (change instanceof NamespaceChange.SetProperty) { NamespaceChange.SetProperty set = (NamespaceChange.SetProperty) change; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index e4563c492f..098b73a50b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -93,6 +93,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte return this.sparkCatalog.paimonCatalog(); } + // ======================= database methods =============================== + @Override public String[] defaultNamespace() { return asNamespaceCatalog().defaultNamespace(); @@ -149,6 +151,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte return asNamespaceCatalog().dropNamespace(namespace, cascade); } + // ======================= table methods =============================== + @Override public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { // delegate to the session catalog because all tables share the same namespace @@ -354,6 +358,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte return (FunctionCatalog) getDelegateCatalog(); } + // ======================= Function methods =============================== + @Override public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { try {