This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit f8d89e67cb57d520f321107622293f09ce10789f Author: Yann Byron <[email protected]> AuthorDate: Thu Jun 12 13:53:56 2025 +0800 [spark] Upgrade spark version to 4.0.0 (#5711) (cherry picked from commit 35539360c9fbb43bfb3fa469421112bf52d4ae75) --- .../paimon/spark/sql/PaimonOptimizationTest.scala | 2 +- .../paimon/spark/sql/PaimonOptimizationTest.scala | 2 +- .../paimon/spark/sql/PaimonOptimizationTest.scala | 2 +- paimon-spark/paimon-spark-4.0/pom.xml | 8 ++- .../paimon/spark/sql/PaimonOptimizationTest.scala | 6 +- .../java/org/apache/paimon/spark/SparkCatalog.java | 12 ++-- .../apache/paimon/spark/SparkGenericCatalog.java | 5 +- .../paimon/spark/SparkInternalRowWrapper.java | 4 +- .../java/org/apache/paimon/spark/SparkRow.java | 4 +- .../paimon/spark/procedure/BaseProcedure.java | 8 +-- .../scala/org/apache/paimon/spark/ScanHelper.scala | 4 +- .../org/apache/paimon/spark/SparkSource.scala | 4 +- .../org/apache/paimon/spark/SparkTypeUtils.java | 8 +-- .../analysis/expressions/ExpressionHelper.scala | 4 +- .../optimizer/EvalSubqueriesForDeleteTable.scala | 12 ++-- .../MergePaimonScalarSubqueriesBase.scala | 15 +++-- .../spark/commands/UpdatePaimonTableCommand.scala | 5 +- .../apache/paimon/spark/data/SparkArrayData.scala | 2 +- .../paimon/spark/data/SparkInternalRow.scala | 2 +- .../paimon/spark/execution/PaimonStrategy.scala | 6 +- .../extensions/PaimonSparkSessionExtensions.scala | 7 +-- .../spark/procedure/SparkOrphanFilesClean.scala | 4 +- .../procedure/SparkRemoveUnexistingFiles.scala | 4 +- .../org/apache/spark/sql/PaimonStatsUtils.scala | 3 +- .../scala/org/apache/spark/sql/PaimonUtils.scala | 5 +- .../AbstractPaimonSparkSqlExtensionsParser.scala | 4 +- .../sql/connector/catalog/PaimonCatalogUtils.scala | 4 +- .../shim/PaimonCreateTableAsSelectStrategy.scala | 6 +- .../spark/sql/paimon/PaimonSparkSession.scala} | 14 +---- .../{SparkShimLoader.scala => ClassicApi.scala} | 36 ++++++------ .../apache/spark/sql/paimon/shims/SparkShim.scala | 17 +++--- .../spark/sql/paimon/shims/SparkShimLoader.scala | 2 +- .../paimon/spark/SparkCatalogWithHiveTest.java | 10 ++-- .../paimon/spark/SparkGenericCatalogTest.java | 3 +- .../spark/SparkGenericCatalogWithHiveTest.java | 5 +- .../spark/extensions/CallStatementParserTest.java | 25 +++++---- .../paimon/spark/procedure/ProcedureTestBase.scala | 10 +++- .../spark/sql/DDLWithHiveCatalogTestBase.scala | 64 ++++++++++++---------- .../spark/sql/PaimonOptimizationTestBase.scala | 1 + .../paimon/spark/sql/SparkVersionSupport.scala | 5 +- .../scala/org/apache/spark/sql/paimon/Utils.scala | 3 +- .../spark/sql/paimon/shims/Classic3Api.scala | 55 +++++++++++++++++++ .../apache/spark/sql/paimon/shims/Spark3Shim.scala | 21 ++++--- paimon-spark/paimon-spark4-common/pom.xml | 4 ++ .../PaimonSpark4SqlExtensionsParser.scala | 5 +- .../spark/sql/paimon/shims/Classic4Api.scala | 63 +++++++++++++++++++++ .../apache/spark/sql/paimon/shims/Spark4Shim.scala | 26 +++++---- pom.xml | 4 +- 48 files changed, 345 insertions(+), 180 deletions(-) diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala index df15d166a6..692c1f6e92 100644 --- a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala @@ -30,7 +30,7 @@ class PaimonOptimizationTest extends PaimonOptimizationTestBase { fieldIndex: Int): NamedExpression = { GetStructField( ScalarSubquery( - SparkShimLoader.getSparkShim + SparkShimLoader.shim .createCTERelationRef(cteIndex, resolved = true, output.toSeq, isStreaming = false)), fieldIndex) .as("scalarsubquery()") diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala index d2a20d6df3..9bf2c684ec 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala @@ -31,7 +31,7 @@ class PaimonOptimizationTest extends PaimonOptimizationTestBase { fieldIndex: Int): NamedExpression = { GetStructField( ScalarSubquery( - SparkShimLoader.getSparkShim + SparkShimLoader.shim .createCTERelationRef(cteIndex, resolved = true, output.toSeq, isStreaming = false)), fieldIndex) .as("scalarsubquery()") diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala index df15d166a6..692c1f6e92 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala @@ -30,7 +30,7 @@ class PaimonOptimizationTest extends PaimonOptimizationTestBase { fieldIndex: Int): NamedExpression = { GetStructField( ScalarSubquery( - SparkShimLoader.getSparkShim + SparkShimLoader.shim .createCTERelationRef(cteIndex, resolved = true, output.toSeq, isStreaming = false)), fieldIndex) .as("scalarsubquery()") diff --git a/paimon-spark/paimon-spark-4.0/pom.xml b/paimon-spark/paimon-spark-4.0/pom.xml index 11ae457db5..20b5953138 100644 --- a/paimon-spark/paimon-spark-4.0/pom.xml +++ b/paimon-spark/paimon-spark-4.0/pom.xml @@ -32,7 +32,7 @@ under the License. <name>Paimon : Spark : 4.0</name> <properties> - <spark.version>4.0.0-preview2</spark.version> + <spark.version>4.0.0</spark.version> </properties> <dependencies> @@ -81,6 +81,12 @@ under the License. <version>${spark.version}</version> <classifier>tests</classifier> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-connect-shims_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala index bbba0b0197..ec140a89bb 100644 --- a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.sql import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, NamedExpression, ScalarSubquery} import org.apache.spark.sql.paimon.shims.SparkShimLoader + class PaimonOptimizationTest extends PaimonOptimizationTestBase { override def extractorExpression( @@ -29,9 +30,10 @@ class PaimonOptimizationTest extends PaimonOptimizationTestBase { fieldIndex: Int): NamedExpression = { GetStructField( ScalarSubquery( - SparkShimLoader.getSparkShim + SparkShimLoader.shim .createCTERelationRef(cteIndex, resolved = true, output.toSeq, isStreaming = false)), - fieldIndex) + fieldIndex, + None) .as("scalarsubquery()") } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index df03286fd0..695f03d9ca 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -38,6 +38,7 @@ import org.apache.paimon.table.FormatTableOptions; import org.apache.paimon.types.DataField; import org.apache.paimon.utils.TypeUtils; +import org.apache.spark.sql.PaimonSparkSession$; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; @@ -110,7 +111,7 @@ public class SparkCatalog extends SparkBaseCatalog CatalogContext catalogContext = CatalogContext.create( Options.fromMap(options), - SparkSession.active().sessionState().newHadoopConf()); + PaimonSparkSession$.MODULE$.active().sessionState().newHadoopConf()); this.catalog = CatalogFactory.createCatalog(catalogContext); this.defaultDatabase = options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue()); @@ -475,6 +476,7 @@ public class SparkCatalog extends SparkBaseCatalog } private static FileTable convertToFileTable(Identifier ident, FormatTable formatTable) { + SparkSession spark = PaimonSparkSession$.MODULE$.active(); StructType schema = SparkTypeUtils.fromPaimonRowType(formatTable.rowType()); StructType partitionSchema = SparkTypeUtils.fromPaimonRowType( @@ -488,7 +490,7 @@ public class SparkCatalog extends SparkBaseCatalog dsOptions = new CaseInsensitiveStringMap(options.toMap()); return new PartitionedCSVTable( ident.name(), - SparkSession.active(), + spark, dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), @@ -497,7 +499,7 @@ public class SparkCatalog extends SparkBaseCatalog } else if (formatTable.format() == FormatTable.Format.ORC) { return new PartitionedOrcTable( ident.name(), - SparkSession.active(), + spark, dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), @@ -506,7 +508,7 @@ public class SparkCatalog extends SparkBaseCatalog } else if (formatTable.format() == FormatTable.Format.PARQUET) { return new PartitionedParquetTable( ident.name(), - SparkSession.active(), + spark, dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), @@ -515,7 +517,7 @@ public class SparkCatalog extends SparkBaseCatalog } else if (formatTable.format() == FormatTable.Format.JSON) { return new PartitionedJsonTable( ident.name(), - SparkSession.active(), + spark, dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index ac1543f2fe..e4563c492f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -27,6 +27,7 @@ import org.apache.paimon.utils.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; +import org.apache.spark.sql.PaimonSparkSession$; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; @@ -202,7 +203,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte return sparkCatalog.createTable(ident, schema, partitions, properties); } else { // delegate to the session catalog - return SparkShimLoader.getSparkShim() + return SparkShimLoader.shim() .createTable(asTableCatalog(), ident, schema, partitions, properties); } } @@ -238,7 +239,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte @Override public final void initialize(String name, CaseInsensitiveStringMap options) { - SparkSession sparkSession = SparkSession.active(); + SparkSession sparkSession = PaimonSparkSession$.MODULE$.active(); SessionState sessionState = sparkSession.sessionState(); Configuration hadoopConf = sessionState.newHadoopConf(); if (options.containsKey(METASTORE.key()) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index 9c9569c573..f42b1fa495 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -152,7 +152,7 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { @Override public Variant getVariant(int pos) { - return SparkShimLoader.getSparkShim().toPaimonVariant(internalRow, pos); + return SparkShimLoader.shim().toPaimonVariant(internalRow, pos); } @Override @@ -307,7 +307,7 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { @Override public Variant getVariant(int pos) { - return SparkShimLoader.getSparkShim().toPaimonVariant(arrayData, pos); + return SparkShimLoader.shim().toPaimonVariant(arrayData, pos); } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index 7d0d8ceb22..0fb3e2bdb3 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -147,7 +147,7 @@ public class SparkRow implements InternalRow, Serializable { @Override public Variant getVariant(int i) { - return SparkShimLoader.getSparkShim().toPaimonVariant(row.getAs(i)); + return SparkShimLoader.shim().toPaimonVariant(row.getAs(i)); } @Override @@ -309,7 +309,7 @@ public class SparkRow implements InternalRow, Serializable { @Override public Variant getVariant(int i) { - return SparkShimLoader.getSparkShim().toPaimonVariant(getAs(i)); + return SparkShimLoader.shim().toPaimonVariant(getAs(i)); } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java index fe9d01971d..2bf38c9fc7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java @@ -22,6 +22,7 @@ import org.apache.paimon.spark.SparkTable; import org.apache.paimon.spark.SparkUtils; import org.apache.paimon.utils.Preconditions; +import org.apache.spark.sql.PaimonSparkSession$; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -30,8 +31,8 @@ import org.apache.spark.sql.connector.catalog.CatalogPlugin; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.execution.CacheManager; import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.paimon.shims.SparkShimLoader; import java.util.function.Function; @@ -44,7 +45,7 @@ abstract class BaseProcedure implements Procedure { private final TableCatalog tableCatalog; protected BaseProcedure(TableCatalog tableCatalog) { - this.spark = SparkSession.active(); + this.spark = PaimonSparkSession$.MODULE$.active(); this.tableCatalog = tableCatalog; } @@ -114,10 +115,9 @@ abstract class BaseProcedure implements Procedure { } protected void refreshSparkCache(Identifier ident, Table table) { - CacheManager cacheManager = spark.sharedState().cacheManager(); DataSourceV2Relation relation = DataSourceV2Relation.create(table, Option.apply(tableCatalog), Option.apply(ident)); - cacheManager.recacheByPlan(spark, relation); + SparkShimLoader.shim().classicApi().recacheByPlan(spark, relation); } protected InternalRow newInternalRow(Object... values) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala index e68407903d..0d3282b621 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala @@ -23,14 +23,14 @@ import org.apache.paimon.io.DataFileMeta import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split} import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{PaimonSparkSession, SparkSession} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer trait ScanHelper extends Logging { - private val spark = SparkSession.active + private val spark = PaimonSparkSession.active val coreOptions: CoreOptions diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala index cdcd2a1668..979191cfab 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala @@ -29,7 +29,7 @@ import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory import org.apache.paimon.table.FormatTable.Format import org.apache.paimon.table.system.AuditLogTable -import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, SparkSession, SQLContext} +import org.apache.spark.sql.{DataFrame, PaimonSparkSession, SaveMode => SparkSaveMode, SparkSession, SQLContext} import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.streaming.Sink @@ -90,7 +90,7 @@ class SparkSource options, extractCatalogName().getOrElse(NAME), Identifier.create(CatalogUtils.database(path), CatalogUtils.table(path)))), - SparkSession.active.sessionState.newHadoopConf() + PaimonSparkSession.active.sessionState.newHadoopConf() ) val table = FileStoreTableFactory.create(catalogContext) if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java similarity index 99% rename from paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java index f72924edce..ae95881621 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java @@ -221,7 +221,7 @@ public class SparkTypeUtils { @Override public DataType visit(VariantType variantType) { - return SparkShimLoader.getSparkShim().SparkVariantType(); + return SparkShimLoader.shim().SparkVariantType(); } @Override @@ -365,12 +365,12 @@ public class SparkTypeUtils { return new FloatType(); } else if (atomic instanceof org.apache.spark.sql.types.DoubleType) { return new DoubleType(); - } else if (atomic instanceof org.apache.spark.sql.types.StringType) { - return new VarCharType(VarCharType.MAX_LENGTH); } else if (atomic instanceof org.apache.spark.sql.types.VarcharType) { return new VarCharType(((org.apache.spark.sql.types.VarcharType) atomic).length()); } else if (atomic instanceof org.apache.spark.sql.types.CharType) { return new CharType(((org.apache.spark.sql.types.CharType) atomic).length()); + } else if (atomic instanceof org.apache.spark.sql.types.StringType) { + return new VarCharType(VarCharType.MAX_LENGTH); } else if (atomic instanceof org.apache.spark.sql.types.DateType) { return new DateType(); } else if (atomic instanceof org.apache.spark.sql.types.TimestampType) { @@ -388,7 +388,7 @@ public class SparkTypeUtils { } else if (atomic instanceof org.apache.spark.sql.types.TimestampNTZType) { // Move TimestampNTZType to the end for compatibility with spark3.3 and below return new TimestampType(); - } else if (SparkShimLoader.getSparkShim().isSparkVariantType(atomic)) { + } else if (SparkShimLoader.shim().isSparkVariantType(atomic)) { return new VariantType(); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala index 682cf88fcf..5540f58e0e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala @@ -108,11 +108,11 @@ trait ExpressionHelperBase extends PredicateHelper { } def toColumn(expr: Expression): Column = { - SparkShimLoader.getSparkShim.column(expr) + SparkShimLoader.shim.classicApi.column(expr) } def toExpression(spark: SparkSession, col: Column): Expression = { - SparkShimLoader.getSparkShim.convertToExpression(spark, col) + SparkShimLoader.shim.classicApi.expression(spark, col) } protected def resolveExpression( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala index 4cf9284f97..66f8a10f37 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala @@ -22,12 +22,13 @@ import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand import org.apache.spark.internal.Logging -import org.apache.spark.sql.{execution, SparkSession} +import org.apache.spark.sql.{execution, PaimonSparkSession, SparkSession} import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery, Literal, ScalarSubquery, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution} +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types.BooleanType import scala.collection.JavaConverters._ @@ -42,7 +43,7 @@ import scala.collection.JavaConverters._ */ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHelper with Logging { - lazy val spark: SparkSession = SparkSession.active + lazy val spark: SparkSession = PaimonSparkSession.active lazy val resolver: Resolver = spark.sessionState.conf.resolver override def apply(plan: LogicalPlan): LogicalPlan = { @@ -75,7 +76,8 @@ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHel throw new RuntimeException("Correlated InSubquery is not supported") } - val executedPlan = QueryExecution.prepareExecutedPlan(spark, listQuery.plan) + val executedPlan = + SparkShimLoader.shim.classicApi.prepareExecutedPlan(spark, listQuery.plan) val physicalSubquery = execution.InSubqueryExec( expr, execution.SubqueryExec(s"subquery#${listQuery.exprId.id}", executedPlan), @@ -83,7 +85,7 @@ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHel evalPhysicalSubquery(physicalSubquery) physicalSubquery.values() match { - case Some(l) if l.length > 0 => In(expr, l.map(Literal(_, expr.dataType))) + case Some(l) if l.length > 0 => In(expr, l.map(Literal(_, expr.dataType)).toSeq) case _ => Literal(false, BooleanType) } @@ -92,7 +94,7 @@ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHel throw new RuntimeException("Correlated ScalarSubquery is not supported") } - val executedPlan = QueryExecution.prepareExecutedPlan(spark, s.plan) + val executedPlan = SparkShimLoader.shim.classicApi.prepareExecutedPlan(spark, s.plan) val physicalSubquery = execution.ScalarSubquery( execution.SubqueryExec .createForScalarSubquery(s"scalar-subquery#${s.exprId.id}", executedPlan), diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala index 95ee8e86b3..e42ac1cc42 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala @@ -282,7 +282,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe Some(scan2) } else { val mergedRequiredSchema = StructType( - (scan2.requiredSchema.fields.toSet ++ scan1.requiredSchema.fields.toSet).toSeq) + (scan2.requiredSchema.fields.toSet ++ scan1.requiredSchema.fields.toSet).toArray) Some(scan2.copy(requiredSchema = mergedRequiredSchema)) } } else { @@ -334,7 +334,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe // Only allow aggregates of the same implementation because merging different implementations // could cause performance regression. - private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = { + private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate): Boolean = { val aggregateExpressionsSeq = Seq(newPlan, cachedPlan).map { plan => plan.aggregateExpressions.flatMap(_.collect { case a: AggregateExpression => a }) } @@ -343,7 +343,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe val Seq(newPlanSupportsHashAggregate, cachedPlanSupportsHashAggregate) = aggregateExpressionsSeq.zip(groupByExpressionSeq).map { case (aggregateExpressions, groupByExpressions) => - SparkShimLoader.getSparkShim.supportsHashAggregate( + SparkShimLoader.shim.supportsHashAggregate( aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes), groupByExpressions) } @@ -351,8 +351,11 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate || newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && { val Seq(newPlanSupportsObjectHashAggregate, cachedPlanSupportsObjectHashAggregate) = - aggregateExpressionsSeq.map( - aggregateExpressions => Aggregate.supportsObjectHashAggregate(aggregateExpressions)) + aggregateExpressionsSeq.zip(groupByExpressionSeq).map { + case (aggregateExpressions, groupByExpressions: Seq[Expression]) => + SparkShimLoader.shim + .supportsObjectHashAggregate(aggregateExpressions, groupByExpressions) + } newPlanSupportsObjectHashAggregate && cachedPlanSupportsObjectHashAggregate || newPlanSupportsObjectHashAggregate == cachedPlanSupportsObjectHashAggregate } @@ -371,7 +374,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe val subqueryCTE = header.plan.asInstanceOf[CTERelationDef] GetStructField( createScalarSubquery( - SparkShimLoader.getSparkShim.createCTERelationRef( + SparkShimLoader.shim.createCTERelationRef( subqueryCTE.id, resolved = true, subqueryCTE.output, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index 74c7e122cd..4c132aae9f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, Project, SupportsSubquery} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.lit -import org.apache.spark.sql.paimon.shims.SparkShimLoader case class UpdatePaimonTableCommand( relation: DataSourceV2Relation, @@ -132,7 +131,7 @@ case class UpdatePaimonTableCommand( touchedDataSplits: Array[DataSplit]): Seq[CommitMessage] = { val updateColumns = updateExpressions.zip(relation.output).map { case (update, origin) => - SparkShimLoader.getSparkShim.column(update).as(origin.name, origin.metadata) + toColumn(update).as(origin.name, origin.metadata) } val toUpdateScanRelation = createNewRelation(touchedDataSplits, relation) @@ -155,7 +154,7 @@ case class UpdatePaimonTableCommand( } else { If(condition, update, origin) } - SparkShimLoader.getSparkShim.column(updated).as(origin.name, origin.metadata) + toColumn(updated).as(origin.name, origin.metadata) } val data = createDataset(sparkSession, toUpdateScanRelation).select(updateColumns: _*) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala index c6539a493c..790d273d0c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala @@ -113,6 +113,6 @@ abstract class AbstractSparkArrayData extends SparkArrayData { object SparkArrayData { def create(elementType: PaimonDataType): SparkArrayData = { - SparkShimLoader.getSparkShim.createSparkArrayData(elementType) + SparkShimLoader.shim.createSparkArrayData(elementType) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala index f3e607e9d7..b0916447c0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala @@ -30,7 +30,7 @@ abstract class SparkInternalRow extends InternalRow { object SparkInternalRow { def create(rowType: RowType): SparkInternalRow = { - SparkShimLoader.getSparkShim.createSparkInternalRow(rowType) + SparkShimLoader.shim.createSparkInternalRow(rowType) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index f2bfc7846b..ca4c356fa5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -23,19 +23,19 @@ import org.apache.paimon.spark.catalog.SupportView import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand} -import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, DescribeRelation, LogicalPlan, ShowCreateTable} import org.apache.spark.sql.connector.catalog.{Identifier, PaimonLookupCatalog, TableCatalog} -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy import scala.collection.JavaConverters._ case class PaimonStrategy(spark: SparkSession) - extends Strategy + extends SparkStrategy with PredicateHelper with PaimonLookupCatalog { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index 4ecff93ea6..c68c91ca45 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -33,16 +33,13 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { // parser extensions - extensions.injectParser { - case (_, parser) => SparkShimLoader.getSparkShim.createSparkParser(parser) - } + extensions.injectParser { case (_, parser) => SparkShimLoader.shim.createSparkParser(parser) } // analyzer extensions extensions.injectResolutionRule(spark => new PaimonAnalysis(spark)) extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark)) extensions.injectResolutionRule(spark => PaimonViewResolver(spark)) - extensions.injectResolutionRule( - spark => SparkShimLoader.getSparkShim.createCustomResolution(spark)) + extensions.injectResolutionRule(spark => SparkShimLoader.shim.createCustomResolution(spark)) extensions.injectResolutionRule(spark => PaimonIncompatibleResolutionRules(spark)) extensions.injectPostHocResolutionRule(spark => ReplacePaimonFunctions(spark)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index 010a3e4ede..9e435d8b44 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -29,7 +29,7 @@ import org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX import org.apache.paimon.utils.SerializableConsumer import org.apache.spark.internal.Logging -import org.apache.spark.sql.{functions, Dataset, SparkSession} +import org.apache.spark.sql.{functions, Dataset, PaimonSparkSession, SparkSession} import org.apache.spark.sql.catalyst.SQLConfHelper import java.util @@ -199,7 +199,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { olderThanMillis: Long, parallelismOpt: Integer, dryRun: Boolean): CleanOrphanFilesResult = { - val spark = SparkSession.active + val spark = PaimonSparkSession.active val parallelism = if (parallelismOpt == null) { Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) } else { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala index f28a12824e..5361d4eafa 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala @@ -27,7 +27,7 @@ import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl, CommitMessageSerializer} import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{PaimonSparkSession, SparkSession} import org.apache.spark.sql.catalyst.SQLConfHelper import java.util @@ -110,7 +110,7 @@ object SparkRemoveUnexistingFiles extends SQLConfHelper { tableName: String, dryRun: Boolean, parallelismOpt: Integer): Array[String] = { - val spark = SparkSession.active + val spark = PaimonSparkSession.active val parallelism = if (parallelismOpt == null) { Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) } else { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala index 8f24700c29..5553b6d8ef 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonStatsUtils.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types._ /** @@ -34,7 +35,7 @@ object PaimonStatsUtils { sparkSession: SparkSession, relation: LogicalPlan, columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = { - CommandUtils.computeColumnStats(sparkSession, relation, columns) + SparkShimLoader.shim.classicApi.computeColumnStats(sparkSession, relation, columns) } /** [[IntegralType]] is private in spark, therefore we need add it here. */ diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala index 9023bfa646..885910dbc2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping import org.apache.spark.sql.internal.connector.PredicateUtils +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.PartitioningUtils @@ -54,11 +55,11 @@ object PaimonUtils { * [[org.apache.spark.sql.execution.streaming.Sink.addBatch]]. */ def createNewDataFrame(data: DataFrame): DataFrame = { - data.sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, data.schema) + SparkShimLoader.shim.classicApi.createDataset(data) } def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[Row] = { - Dataset.ofRows(sparkSession, logicalPlan) + SparkShimLoader.shim.classicApi.createDataset(sparkSession, logicalPlan) } def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 557b0735c7..73e1ea3ec8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -25,7 +25,7 @@ import org.antlr.v4.runtime.atn.PredictionMode import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} import org.antlr.v4.runtime.tree.TerminalNodeImpl import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.{AnalysisException, PaimonSparkSession, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} @@ -65,7 +65,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement())) .asInstanceOf[LogicalPlan] } else { - RewritePaimonViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText)) + RewritePaimonViewCommands(PaimonSparkSession.active).apply(delegate.parsePlan(sqlText)) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala index 5db6894ba0..f330fed3f3 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{PaimonSparkSession, SparkSession} import org.apache.spark.sql.catalyst.catalog.ExternalCatalog import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -30,7 +30,7 @@ object PaimonCatalogUtils { def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration): ExternalCatalog = { val externalCatalogClassName = - if (SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) { + if (PaimonSparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) { "org.apache.spark.sql.hive.HiveExternalCatalog" } else { "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog" diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 8279a6de31..fd6627c095 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -22,17 +22,17 @@ import org.apache.paimon.CoreOptions import org.apache.paimon.spark.SparkCatalog import org.apache.paimon.spark.catalog.FormatTableCatalog -import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} import org.apache.spark.sql.connector.catalog.StagingTableCatalog -import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan} +import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec import scala.collection.JavaConverters._ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) - extends Strategy + extends SparkStrategy with PaimonStrategyHelper { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonSparkSession.scala similarity index 68% copy from paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala copy to paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonSparkSession.scala index 647b4cfdca..674a9196f7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonSparkSession.scala @@ -16,18 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.spark.sql +package org.apache.spark.sql -import org.apache.spark.SPARK_VERSION +object PaimonSparkSession { -trait SparkVersionSupport { - lazy val sparkVersion: String = SPARK_VERSION + def active: SparkSession = SparkSession.active - lazy val gteqSpark3_3: Boolean = sparkVersion >= "3.3" - - lazy val gteqSpark3_4: Boolean = sparkVersion >= "3.4" - - lazy val gteqSpark3_5: Boolean = sparkVersion >= "3.5" - - lazy val gteqSpark4_0: Boolean = sparkVersion >= "4.0" } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala similarity index 50% copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala copy to paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala index 920896547a..21381cca29 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala @@ -18,26 +18,28 @@ package org.apache.spark.sql.paimon.shims -import java.util.ServiceLoader +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.execution.SparkPlan -import scala.collection.JavaConverters._ +trait ClassicApi { -/** Load a [[SparkShim]]'s implementation. */ -object SparkShimLoader { + def column(expression: Expression): Column - private lazy val sparkShim: SparkShim = loadSparkShim() + def expression(spark: SparkSession, column: Column): Expression - def getSparkShim: SparkShim = { - sparkShim - } + def createDataset(data: DataFrame): DataFrame + + def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[Row] + + def prepareExecutedPlan(spark: SparkSession, logicalPlan: LogicalPlan): SparkPlan + + def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit + + def computeColumnStats( + sparkSession: SparkSession, + relation: LogicalPlan, + columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) - private def loadSparkShim(): SparkShim = { - val shims = ServiceLoader.load(classOf[SparkShim]).asScala - if (shims.isEmpty) { - throw new IllegalStateException("No available spark shim here.") - } else if (shims.size > 1) { - throw new IllegalStateException("Found more than one spark shim here.") - } - shims.head - } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index 6b771a3339..ef764bc0d1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -22,9 +22,10 @@ import org.apache.paimon.data.variant.Variant import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} -import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule @@ -42,6 +43,8 @@ import java.util.{Map => JMap} */ trait SparkShim { + def classicApi: ClassicApi + def createSparkParser(delegate: ParserInterface): ParserInterface def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] @@ -50,10 +53,6 @@ trait SparkShim { def createSparkArrayData(elementType: DataType): SparkArrayData - def supportsHashAggregate( - aggregateBufferAttributes: Seq[Attribute], - groupingExpression: Seq[Expression]): Boolean - def createTable( tableCatalog: TableCatalog, ident: Identifier, @@ -67,9 +66,13 @@ trait SparkShim { output: Seq[Attribute], isStreaming: Boolean): CTERelationRef - def column(expr: Expression): Column + def supportsHashAggregate( + aggregateBufferAttributes: Seq[Attribute], + groupingExpression: Seq[Expression]): Boolean - def convertToExpression(spark: SparkSession, column: Column): Expression + def supportsObjectHashAggregate( + aggregateExpressions: Seq[AggregateExpression], + groupByExpressions: Seq[Expression]): Boolean // for variant def toPaimonVariant(o: Object): Variant diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala index 920896547a..d6b5850b9d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala @@ -27,7 +27,7 @@ object SparkShimLoader { private lazy val sparkShim: SparkShim = loadSparkShim() - def getSparkShim: SparkShim = { + def shim: SparkShim = { sparkShim } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java index 488913f14a..9f383a54c2 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -56,8 +57,9 @@ public class SparkCatalogWithHiveTest { } @Test - public void testCreateFormatTable() { - try (SparkSession spark = createSessionBuilder().getOrCreate()) { + public void testCreateFormatTable() throws IOException { + SparkSession spark = createSessionBuilder().getOrCreate(); + { spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); spark.sql("USE spark_catalog.my_db1"); @@ -110,7 +112,7 @@ public class SparkCatalogWithHiveTest { } @Test - public void testSpecifyHiveConfDirInGenericCatalog() { + public void testSpecifyHiveConfDirInGenericCatalog() throws IOException { try (SparkSession spark = createSessionBuilder() .config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath") @@ -126,7 +128,7 @@ public class SparkCatalogWithHiveTest { } @Test - public void testCreateExternalTable() { + public void testCreateExternalTable() throws IOException { try (SparkSession spark = createSessionBuilder().getOrCreate()) { String warehousePath = spark.sparkContext().conf().get("spark.sql.warehouse.dir"); spark.sql("CREATE DATABASE IF NOT EXISTS test_db"); diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java index 0ae0e91306..b38995da14 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -82,7 +83,7 @@ public class SparkGenericCatalogTest { } @Test - public void testSparkSessionReload() { + public void testSparkSessionReload() throws IOException { spark.sql("CREATE DATABASE my_db"); spark.close(); diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java index 604e2ea279..3bb013648e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.FileNotFoundException; +import java.io.IOException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -48,7 +49,7 @@ public class SparkGenericCatalogWithHiveTest { } @Test - public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) { + public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) throws IOException { // firstly, we use hive metastore to create table, and check the result. Path warehousePath = new Path("file:" + tempDir.toString()); SparkSession spark = @@ -105,7 +106,7 @@ public class SparkGenericCatalogWithHiveTest { } @Test - public void testHiveCatalogOptions(@TempDir java.nio.file.Path tempDir) { + public void testHiveCatalogOptions(@TempDir java.nio.file.Path tempDir) throws IOException { Path warehousePath = new Path("file:" + tempDir.toString()); SparkSession spark = SparkSession.builder() diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java index e4e571e96b..9f77a93bf0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.spark.catalyst.plans.logical.PaimonCallArgument; import org.apache.paimon.spark.catalyst.plans.logical.PaimonCallStatement; import org.apache.paimon.spark.catalyst.plans.logical.PaimonNamedArgument; import org.apache.paimon.spark.catalyst.plans.logical.PaimonPositionalArgument; +import org.apache.paimon.spark.sql.SparkVersionSupport$; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.expressions.Literal$; @@ -81,16 +82,20 @@ public class CallStatementParserTest { @Test public void testDelegateUnsupportedProcedure() { - assertThatThrownBy(() -> parser.parsePlan("CALL cat.d.t()")) - .isInstanceOf(ParseException.class) - .satisfies( - exception -> { - ParseException parseException = (ParseException) exception; - assertThat(parseException.getErrorClass()) - .isEqualTo("PARSE_SYNTAX_ERROR"); - assertThat(parseException.getMessageParameters().get("error")) - .isEqualTo("'CALL'"); - }); + if (!SparkVersionSupport$.MODULE$.gteqSpark4_0()) { + // TODO: adapt spark 4.0 to make Paimon parser only apply own supported procedures. + + assertThatThrownBy(() -> parser.parsePlan("CALL cat.d.t()")) + .isInstanceOf(ParseException.class) + .satisfies( + exception -> { + ParseException parseException = (ParseException) exception; + assertThat(parseException.getErrorClass()) + .isEqualTo("PARSE_SYNTAX_ERROR"); + assertThat(parseException.getMessageParameters().get("error")) + .isEqualTo("'CALL'"); + }); + } } @Test diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala index a5f9f3ffa0..df98026022 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.parser.extensions.PaimonParseException import org.assertj.core.api.Assertions.assertThatThrownBy @@ -31,8 +32,13 @@ abstract class ProcedureTestBase extends PaimonSparkTestBase { |CREATE TABLE T (id INT, name STRING, dt STRING) |""".stripMargin) - assertThatThrownBy(() => spark.sql("CALL sys.unknown_procedure(table => 'test.T')")) - .isInstanceOf(classOf[ParseException]) + if (gteqSpark4_0) { + assertThatThrownBy(() => spark.sql("CALL sys.unknown_procedure(table => 'test.T')")) + .isInstanceOf(classOf[AnalysisException]) + } else { + assertThatThrownBy(() => spark.sql("CALL sys.unknown_procedure(table => 'test.T')")) + .isInstanceOf(classOf[ParseException]) + } } test(s"test parse exception") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 36315f0228..06c2eaf049 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -285,41 +285,45 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } test("Paimon DDL with hive catalog: set default database") { - var reusedSpark = spark + if (!gteqSpark4_0) { + // TODO: This is skipped in Spark 4.0, because it would fail in afterAll method, not because the default database is not supported. - Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { - catalogName => - { - val dbName = s"${catalogName}_default_db" - val tblName = s"${dbName}_tbl" - - reusedSpark.sql(s"use $catalogName") - reusedSpark.sql(s"create database $dbName") - reusedSpark.sql(s"use $dbName") - reusedSpark.sql(s"create table $tblName (id int, name string, dt string) using paimon") - reusedSpark.stop() - - reusedSpark = SparkSession - .builder() - .master("local[2]") - .config(sparkConf) - .config("spark.sql.defaultCatalog", catalogName) - .config(s"spark.sql.catalog.$catalogName.defaultDatabase", dbName) - .getOrCreate() - - if (catalogName.equals(sparkCatalogName) && !gteqSpark3_4) { - checkAnswer(reusedSpark.sql("show tables").select("tableName"), Nil) + var reusedSpark = spark + + Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + { + val dbName = s"${catalogName}_default_db" + val tblName = s"${dbName}_tbl" + + reusedSpark.sql(s"use $catalogName") + reusedSpark.sql(s"create database $dbName") reusedSpark.sql(s"use $dbName") + reusedSpark.sql(s"create table $tblName (id int, name string, dt string) using paimon") + reusedSpark.stop() + + reusedSpark = SparkSession + .builder() + .master("local[2]") + .config(sparkConf) + .config("spark.sql.defaultCatalog", catalogName) + .config(s"spark.sql.catalog.$catalogName.defaultDatabase", dbName) + .getOrCreate() + + if (catalogName.equals(sparkCatalogName) && !gteqSpark3_4) { + checkAnswer(reusedSpark.sql("show tables").select("tableName"), Nil) + reusedSpark.sql(s"use $dbName") + } + checkAnswer(reusedSpark.sql("show tables").select("tableName"), Row(tblName) :: Nil) + + reusedSpark.sql(s"drop table $tblName") } - checkAnswer(reusedSpark.sql("show tables").select("tableName"), Row(tblName) :: Nil) + } - reusedSpark.sql(s"drop table $tblName") - } + // Since we created a new sparkContext, we need to stop it and reset the default sparkContext + reusedSpark.stop() + reset() } - - // Since we created a new sparkContext, we need to stop it and reset the default sparkContext - reusedSpark.stop() - reset() } test("Paimon DDL with hive catalog: drop database cascade which contains paimon table") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala index 87f4c94486..e5f1d0e131 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LogicalPlan, import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.functions._ import org.apache.spark.sql.paimon.Utils +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.junit.jupiter.api.Assertions import scala.collection.immutable diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala index 647b4cfdca..9dadb26e25 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala @@ -21,7 +21,8 @@ package org.apache.paimon.spark.sql import org.apache.spark.SPARK_VERSION trait SparkVersionSupport { - lazy val sparkVersion: String = SPARK_VERSION + + val sparkVersion: String = SPARK_VERSION lazy val gteqSpark3_3: Boolean = sparkVersion >= "3.3" @@ -31,3 +32,5 @@ trait SparkVersionSupport { lazy val gteqSpark4_0: Boolean = sparkVersion >= "4.0" } + +object SparkVersionSupport extends SparkVersionSupport {} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala index 03f1c7706e..61a479b9f0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.paimon import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.util.{Utils => SparkUtils} import java.io.File @@ -36,7 +37,7 @@ object Utils { } def createDataFrame(sparkSession: SparkSession, plan: LogicalPlan): DataFrame = { - Dataset.ofRows(sparkSession, plan) + SparkShimLoader.shim.classicApi.createDataset(sparkSession, plan) } } diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala new file mode 100644 index 0000000000..b0782c59d6 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon.shims + +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} +import org.apache.spark.sql.execution.command.CommandUtils +class Classic3Api extends ClassicApi { + + override def column(expression: Expression): Column = new Column(expression) + + override def expression(spark: SparkSession, column: Column): Expression = column.expr + + override def createDataset(data: DataFrame): DataFrame = { + data.sqlContext + .internalCreateDataFrame(data.queryExecution.toRdd, data.schema) + } + + override def createDataset(spark: SparkSession, logicalPlan: LogicalPlan): Dataset[Row] = { + Dataset.ofRows(spark, logicalPlan) + } + + override def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = { + spark.sharedState.cacheManager.recacheByPlan(spark, plan) + } + + override def prepareExecutedPlan(spark: SparkSession, logicalPlan: LogicalPlan): SparkPlan = { + QueryExecution.prepareExecutedPlan(spark, logicalPlan) + } + + override def computeColumnStats( + spark: SparkSession, + relation: LogicalPlan, + columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = + CommandUtils.computeColumnStats(spark, relation, columns) + +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index b173a3ff5c..18ba22674d 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -27,18 +27,22 @@ import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.types.StructType import java.util.{Map => JMap} class Spark3Shim extends SparkShim { + override def classicApi: ClassicApi = new Classic3Api + override def createSparkParser(delegate: ParserInterface): ParserInterface = { new PaimonSpark3SqlExtensionsParser(delegate) } @@ -55,12 +59,6 @@ class Spark3Shim extends SparkShim { new Spark3ArrayData(elementType) } - override def supportsHashAggregate( - aggregateBufferAttributes: Seq[Attribute], - groupingExpression: Seq[Expression]): Boolean = { - Aggregate.supportsHashAggregate(aggregateBufferAttributes) - } - override def createTable( tableCatalog: TableCatalog, ident: Identifier, @@ -77,9 +75,15 @@ class Spark3Shim extends SparkShim { isStreaming: Boolean): CTERelationRef = MinorVersionShim.createCTERelationRef(cteId, resolved, output, isStreaming) - override def column(expr: Expression): Column = new Column(expr) + override def supportsHashAggregate( + aggregateBufferAttributes: Seq[Attribute], + groupingExpression: Seq[Expression]): Boolean = + Aggregate.supportsHashAggregate(aggregateBufferAttributes) - override def convertToExpression(spark: SparkSession, column: Column): Expression = column.expr + override def supportsObjectHashAggregate( + aggregateExpressions: Seq[AggregateExpression], + groupByExpressions: Seq[Expression]): Boolean = + Aggregate.supportsObjectHashAggregate(aggregateExpressions) override def toPaimonVariant(o: Object): Variant = throw new UnsupportedOperationException() @@ -94,4 +98,5 @@ class Spark3Shim extends SparkShim { override def toPaimonVariant(array: ArrayData, pos: Int): Variant = throw new UnsupportedOperationException() + } diff --git a/paimon-spark/paimon-spark4-common/pom.xml b/paimon-spark/paimon-spark4-common/pom.xml index e839cd45df..e8b7356eca 100644 --- a/paimon-spark/paimon-spark4-common/pom.xml +++ b/paimon-spark/paimon-spark4-common/pom.xml @@ -49,6 +49,10 @@ under the License. <artifactId>spark-sql-api_2.13</artifactId> <version>${spark.version}</version> <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-connect-shims_${scala.binary.version}</artifactId> + </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala index ef1f5763d2..9bd395f333 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/parser/extensions/PaimonSpark4SqlExtensionsParser.scala @@ -18,11 +18,12 @@ package org.apache.paimon.spark.catalyst.parser.extensions -import org.apache.spark.sql.catalyst.parser.{CompoundBody, ParserInterface} +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.AbstractPaimonSparkSqlExtensionsParser +import org.apache.spark.sql.types.StructType class PaimonSpark4SqlExtensionsParser(override val delegate: ParserInterface) extends AbstractPaimonSparkSqlExtensionsParser(delegate) { - def parseScript(sqlScriptText: String): CompoundBody = delegate.parseScript(sqlScriptText) + override def parseRoutineParam(sqlText: String): StructType = delegate.parseRoutineParam(sqlText) } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala new file mode 100644 index 0000000000..8dff78f0bc --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon.shims + +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.classic.{ClassicConversions, Dataset => ClassicDataset, ExpressionUtils} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} +import org.apache.spark.sql.execution.command.CommandUtils + +/** + * This class is used to implement the conversion from sql-api to classic one. Make sure this is the + * only class that implements [[org.apache.spark.sql.classic.ClassicConversions]] in Paimon-Spark. + */ +class Classic4Api extends ClassicApi with ClassicConversions { + + override def column(expression: Expression): Column = ExpressionUtils.column(expression) + + override def expression(spark: SparkSession, column: Column): Expression = { + spark.expression(column) + } + + override def createDataset(data: DataFrame): DataFrame = { + data.sqlContext + .internalCreateDataFrame(data.queryExecution.toRdd, data.schema) + } + + override def createDataset(spark: SparkSession, logicalPlan: LogicalPlan): Dataset[Row] = { + ClassicDataset.ofRows(spark, logicalPlan) + } + + override def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = { + spark.sharedState.cacheManager.recacheByPlan(spark, plan) + } + + override def prepareExecutedPlan(spark: SparkSession, logicalPlan: LogicalPlan): SparkPlan = { + QueryExecution.prepareExecutedPlan(spark, logicalPlan) + } + + override def computeColumnStats( + spark: SparkSession, + relation: LogicalPlan, + columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = { + CommandUtils.computeColumnStats(spark, relation, columns.toSeq) + } +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index e8e4783138..cb535ef86c 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -24,16 +24,16 @@ import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensi import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} -import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.internal.ExpressionUtils import org.apache.spark.sql.types.{DataTypes, StructType, VariantType} import org.apache.spark.unsafe.types.VariantVal @@ -41,6 +41,8 @@ import java.util.{Map => JMap} class Spark4Shim extends SparkShim { + override def classicApi: ClassicApi = new Classic4Api + override def createSparkParser(delegate: ParserInterface): ParserInterface = { new PaimonSpark4SqlExtensionsParser(delegate) } @@ -57,13 +59,7 @@ class Spark4Shim extends SparkShim { new Spark4ArrayData(elementType) } - def supportsHashAggregate( - aggregateBufferAttributes: Seq[Attribute], - groupingExpression: Seq[Expression]): Boolean = { - Aggregate.supportsHashAggregate(aggregateBufferAttributes, groupingExpression) - } - - def createTable( + override def createTable( tableCatalog: TableCatalog, ident: Identifier, schema: StructType, @@ -81,10 +77,16 @@ class Spark4Shim extends SparkShim { CTERelationRef(cteId, resolved, output.toSeq, isStreaming) } - def column(expr: Expression): Column = ExpressionUtils.column(expr) + override def supportsHashAggregate( + aggregateBufferAttributes: Seq[Attribute], + groupingExpression: Seq[Expression]): Boolean = { + Aggregate.supportsHashAggregate(aggregateBufferAttributes.toSeq, groupingExpression.toSeq) + } - def convertToExpression(spark: SparkSession, column: Column): Expression = - spark.expression(column) + override def supportsObjectHashAggregate( + aggregateExpressions: Seq[AggregateExpression], + groupByExpressions: Seq[Expression]): Boolean = + Aggregate.supportsObjectHashAggregate(aggregateExpressions.toSeq, groupByExpressions.toSeq) override def toPaimonVariant(o: Object): Variant = { val v = o.asInstanceOf[VariantVal] diff --git a/pom.xml b/pom.xml index bfdd189eff..ef045dbcb9 100644 --- a/pom.xml +++ b/pom.xml @@ -400,10 +400,10 @@ under the License. <antlr4.version>4.13.1</antlr4.version> <scala.binary.version>2.13</scala.binary.version> <scala.version>${scala213.version}</scala.version> - <paimon-spark-common.spark.version>4.0.0-preview2</paimon-spark-common.spark.version> + <paimon-spark-common.spark.version>4.0.0</paimon-spark-common.spark.version> <paimon-sparkx-common>paimon-spark4-common</paimon-sparkx-common> <test.spark.main.version>4.0</test.spark.main.version> - <test.spark.version>4.0.0-preview2</test.spark.version> + <test.spark.version>4.0.0</test.spark.version> </properties> <activation> <property>
