This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 19d7f66a0 [spark] Introduce View Support to SparkCatalog (#4538) 19d7f66a0 is described below commit 19d7f66a006d02ee3ad233937fc6116a07acefbb Author: Zouxxyy <zouxinyu....@alibaba-inc.com> AuthorDate: Tue Nov 19 11:02:16 2024 +0800 [spark] Introduce View Support to SparkCatalog (#4538) --- .../extensions/RewritePaimonViewCommands.scala | 80 ++++++++++++++ .../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +--- .../extensions/RewritePaimonViewCommands.scala | 79 ++++++++++++++ .../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +--- .../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +--- .../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +--- .../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +--- .../java/org/apache/paimon/spark/SparkCatalog.java | 49 ++------- .../org/apache/paimon/spark/SparkTypeUtils.java | 4 + .../apache/paimon/spark/catalog/SupportView.java | 86 +++++++++++++++ .../apache/paimon/spark/utils/CatalogUtils.java} | 26 +++-- .../catalyst/analysis/PaimonViewResolver.scala | 85 +++++++++++++++ .../catalyst/plans/logical/PaimonViewCommand.scala | 74 +++++++++++++ .../paimon/spark/execution/PaimonStrategy.scala | 37 ++++++- .../paimon/spark/execution/PaimonViewExec.scala | 117 +++++++++++++++++++++ .../extensions/PaimonSparkSessionExtensions.scala | 3 +- .../org/apache/paimon/spark/leafnode/package.scala | 7 +- .../PaimonSparkSqlExtensionsParser.scala | 4 +- .../extensions/PaimonSqlExtensionsAstBuilder.scala | 13 ++- .../extensions/RewritePaimonViewCommands.scala | 77 ++++++++++++++ .../spark/sql/DDLWithHiveCatalogTestBase.scala | 4 +- .../paimon/spark/sql/PaimonViewTestBase.scala | 96 +++++++++++++++++ 22 files changed, 794 insertions(+), 137 deletions(-) diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala new file mode 100644 index 000000000..e759edd0c --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser.extensions + +import org.apache.paimon.spark.catalog.SupportView +import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, DropPaimonView, ResolvedIdentifier, ShowPaimonViews} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, ResolvedNamespace, UnresolvedView} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog} + +case class RewritePaimonViewCommands(spark: SparkSession) + extends Rule[LogicalPlan] + with LookupCatalog { + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + + case CreateViewStatement( + ResolvedIdent(resolved), + userSpecifiedColumns, + comment, + properties, + Some(originalText), + child, + allowExisting, + replace, + _) => + CreatePaimonView( + child = resolved, + queryText = originalText, + query = CTESubstitution.apply(child), + columnAliases = userSpecifiedColumns.map(_._1), + columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace + ) + + case DropView(ResolvedIdent(resolved), ifExists: Boolean) => + DropPaimonView(resolved, ifExists) + + case ShowViews(_, pattern, output) if catalogManager.currentCatalog.isInstanceOf[SupportView] => + ShowPaimonViews( + ResolvedNamespace(catalogManager.currentCatalog, catalogManager.currentNamespace), + pattern, + output) + } + + private object ResolvedIdent { + def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved match { + case CatalogAndIdentifier(viewCatalog: SupportView, ident) => + Some(ResolvedIdentifier(viewCatalog, ident)) + case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView, ident), _, _, _) => + Some(ResolvedIdentifier(viewCatalog, ident)) + case _ => + None + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala similarity index 60% copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala copy to paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala index 5befb88da..6ab8a2671 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala @@ -16,20 +16,6 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.spark.sql -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LeafParsedStatement} -import org.apache.spark.sql.execution.command.LeafRunnableCommand -import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec - -package object leafnode { - - trait PaimonLeafParsedStatement extends LeafParsedStatement - - trait PaimonLeafRunnableCommand extends LeafRunnableCommand - - trait PaimonLeafCommand extends LeafCommand - - trait PaimonLeafV2CommandExec extends LeafV2CommandExec - -} +class PaimonViewTest extends PaimonViewTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala new file mode 100644 index 000000000..5d57cda2f --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser.extensions + +import org.apache.paimon.spark.catalog.SupportView +import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, DropPaimonView, ResolvedIdentifier, ShowPaimonViews} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, ResolvedNamespace, UnresolvedDBObjectName, UnresolvedView} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog} + +case class RewritePaimonViewCommands(spark: SparkSession) + extends Rule[LogicalPlan] + with LookupCatalog { + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + + case CreateView( + ResolvedIdent(resolved), + userSpecifiedColumns, + comment, + properties, + Some(queryText), + query, + allowExisting, + replace) => + CreatePaimonView( + child = resolved, + queryText = queryText, + query = CTESubstitution.apply(query), + columnAliases = userSpecifiedColumns.map(_._1), + columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace + ) + + case DropView(ResolvedIdent(resolved), ifExists: Boolean) => + DropPaimonView(resolved, ifExists) + + case ShowViews(_, pattern, output) if catalogManager.currentCatalog.isInstanceOf[SupportView] => + ShowPaimonViews( + ResolvedNamespace(catalogManager.currentCatalog, catalogManager.currentNamespace), + pattern, + output) + } + + private object ResolvedIdent { + def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved match { + case UnresolvedDBObjectName(CatalogAndIdentifier(viewCatalog: SupportView, ident), _) => + Some(ResolvedIdentifier(viewCatalog, ident)) + case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView, ident), _, _, _) => + Some(ResolvedIdentifier(viewCatalog, ident)) + case _ => + None + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala similarity index 60% copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala copy to paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala index 5befb88da..6ab8a2671 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala @@ -16,20 +16,6 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.spark.sql -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LeafParsedStatement} -import org.apache.spark.sql.execution.command.LeafRunnableCommand -import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec - -package object leafnode { - - trait PaimonLeafParsedStatement extends LeafParsedStatement - - trait PaimonLeafRunnableCommand extends LeafRunnableCommand - - trait PaimonLeafCommand extends LeafCommand - - trait PaimonLeafV2CommandExec extends LeafV2CommandExec - -} +class PaimonViewTest extends PaimonViewTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala similarity index 60% copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala copy to paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala index 5befb88da..6ab8a2671 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala @@ -16,20 +16,6 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.spark.sql -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LeafParsedStatement} -import org.apache.spark.sql.execution.command.LeafRunnableCommand -import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec - -package object leafnode { - - trait PaimonLeafParsedStatement extends LeafParsedStatement - - trait PaimonLeafRunnableCommand extends LeafRunnableCommand - - trait PaimonLeafCommand extends LeafCommand - - trait PaimonLeafV2CommandExec extends LeafV2CommandExec - -} +class PaimonViewTest extends PaimonViewTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala similarity index 60% copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala copy to paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala index 5befb88da..6ab8a2671 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala @@ -16,20 +16,6 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.spark.sql -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LeafParsedStatement} -import org.apache.spark.sql.execution.command.LeafRunnableCommand -import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec - -package object leafnode { - - trait PaimonLeafParsedStatement extends LeafParsedStatement - - trait PaimonLeafRunnableCommand extends LeafRunnableCommand - - trait PaimonLeafCommand extends LeafCommand - - trait PaimonLeafV2CommandExec extends LeafV2CommandExec - -} +class PaimonViewTest extends PaimonViewTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala similarity index 60% copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala copy to paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala index 5befb88da..6ab8a2671 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala @@ -16,20 +16,6 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.spark.sql -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LeafParsedStatement} -import org.apache.spark.sql.execution.command.LeafRunnableCommand -import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec - -package object leafnode { - - trait PaimonLeafParsedStatement extends LeafParsedStatement - - trait PaimonLeafRunnableCommand extends LeafRunnableCommand - - trait PaimonLeafCommand extends LeafCommand - - trait PaimonLeafV2CommandExec extends LeafV2CommandExec - -} +class PaimonViewTest extends PaimonViewTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 89448c1f4..3b9af1694 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -27,6 +27,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.spark.catalog.SparkBaseCatalog; import org.apache.paimon.spark.catalog.SupportFunction; +import org.apache.paimon.spark.catalog.SupportView; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.FormatTableOptions; @@ -72,10 +73,12 @@ import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf; +import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace; +import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Spark {@link TableCatalog} for paimon. */ -public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { +public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, SupportView { private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class); @@ -126,10 +129,7 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { @Override public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException { - checkArgument( - isValidateNamespace(namespace), - "Namespace %s is not valid", - Arrays.toString(namespace)); + checkNamespace(namespace); try { catalog.createDatabase(namespace[0], false, metadata); } catch (Catalog.DatabaseAlreadyExistException e) { @@ -152,9 +152,7 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { if (namespace.length == 0) { return listNamespaces(); } - if (!isValidateNamespace(namespace)) { - throw new NoSuchNamespaceException(namespace); - } + checkNamespace(namespace); try { catalog.getDatabase(namespace[0]); return new String[0][]; @@ -166,10 +164,7 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { @Override public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException { - checkArgument( - isValidateNamespace(namespace), - "Namespace %s is not valid", - Arrays.toString(namespace)); + checkNamespace(namespace); String dataBaseName = namespace[0]; try { return catalog.getDatabase(dataBaseName).options(); @@ -207,10 +202,7 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { */ public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException { - checkArgument( - isValidateNamespace(namespace), - "Namespace %s is not valid", - Arrays.toString(namespace)); + checkNamespace(namespace); try { catalog.dropDatabase(namespace[0], false, cascade); return true; @@ -224,10 +216,7 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { @Override public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { - checkArgument( - isValidateNamespace(namespace), - "Missing database in namespace: %s", - Arrays.toString(namespace)); + checkNamespace(namespace); try { return catalog.listTables(namespace[0]).stream() .map(table -> Identifier.of(namespace, table)) @@ -239,10 +228,7 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { @Override public void invalidateTable(Identifier ident) { - try { - catalog.invalidateTable(toIdentifier(ident)); - } catch (NoSuchTableException ignored) { - } + catalog.invalidateTable(toIdentifier(ident)); } @Override @@ -347,7 +333,7 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { try { catalog.dropTable(toIdentifier(ident), false); return true; - } catch (Catalog.TableNotExistException | NoSuchTableException e) { + } catch (Catalog.TableNotExistException e) { return false; } } @@ -454,10 +440,6 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { } } - private boolean isValidateNamespace(String[] namespace) { - return namespace.length == 1; - } - @Override public void renameTable(Identifier oldIdent, Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { @@ -472,15 +454,6 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction { // --------------------- tools ------------------------------------------ - protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) - throws NoSuchTableException { - if (!isValidateNamespace(ident.namespace())) { - throw new NoSuchTableException(ident); - } - - return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name()); - } - protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException { try { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java index 8bba67620..f6643f758 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java @@ -81,6 +81,10 @@ public class SparkTypeUtils { return type.accept(PaimonToSparkTypeVisitor.INSTANCE); } + public static org.apache.paimon.types.RowType toPaimonRowType(StructType type) { + return (RowType) toPaimonType(type); + } + public static org.apache.paimon.types.DataType toPaimonType(DataType dataType) { return SparkToPaimonTypeVisitor.visit(dataType); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java new file mode 100644 index 000000000..b8ce86e89 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalog; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.spark.SparkTypeUtils; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewImpl; + +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace; +import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier; + +/** Catalog methods for working with Views. */ +public interface SupportView extends WithPaimonCatalog { + + default List<String> listViews(String[] namespace) throws NoSuchNamespaceException { + try { + checkNamespace(namespace); + return paimonCatalog().listViews(namespace[0]); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchNamespaceException(namespace); + } + } + + default View loadView(Identifier ident) throws Catalog.ViewNotExistException { + return paimonCatalog().getView(toIdentifier(ident)); + } + + default void createView( + Identifier ident, + StructType schema, + String queryText, + String comment, + Map<String, String> properties, + Boolean ignoreIfExists) + throws NoSuchNamespaceException { + org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident); + try { + paimonCatalog() + .createView( + paimonIdent, + new ViewImpl( + paimonIdent, + SparkTypeUtils.toPaimonRowType(schema), + queryText, + comment, + properties), + ignoreIfExists); + } catch (Catalog.ViewAlreadyExistException e) { + throw new RuntimeException("view already exists: " + ident, e); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchNamespaceException(ident.namespace()); + } + } + + default void dropView(Identifier ident, Boolean ignoreIfExists) { + try { + paimonCatalog().dropView(toIdentifier(ident), ignoreIfExists); + } catch (Catalog.ViewNotExistException e) { + throw new RuntimeException("view not exists: " + ident, e); + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java similarity index 53% copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala copy to paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java index 5befb88da..fca9df210 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java @@ -16,20 +16,26 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.spark.utils; -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LeafParsedStatement} -import org.apache.spark.sql.execution.command.LeafRunnableCommand -import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec +import org.apache.spark.sql.connector.catalog.Identifier; -package object leafnode { +import java.util.Arrays; - trait PaimonLeafParsedStatement extends LeafParsedStatement +import static org.apache.paimon.utils.Preconditions.checkArgument; - trait PaimonLeafRunnableCommand extends LeafRunnableCommand +/** Utils of catalog. */ +public class CatalogUtils { - trait PaimonLeafCommand extends LeafCommand - - trait PaimonLeafV2CommandExec extends LeafV2CommandExec + public static void checkNamespace(String[] namespace) { + checkArgument( + namespace.length == 1, + "Paimon only support single namespace, but got %s", + Arrays.toString(namespace)); + } + public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) { + checkNamespace(ident.namespace()); + return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name()); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala new file mode 100644 index 000000000..a375a2965 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.catalog.Catalog.ViewNotExistException +import org.apache.paimon.spark.SparkTypeUtils +import org.apache.paimon.spark.catalog.SupportView +import org.apache.paimon.view.View + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedRelation} +import org.apache.spark.sql.catalyst.expressions.{Alias, UpCast} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.parser.extensions.{CurrentOrigin, Origin} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog + +case class PaimonViewResolver(spark: SparkSession) + extends Rule[LogicalPlan] + with PaimonLookupCatalog { + + protected lazy val catalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case u @ UnresolvedRelation(parts @ CatalogAndIdentifier(catalog: SupportView, ident), _, _) => + try { + val view = catalog.loadView(ident) + createViewRelation(parts, view) + } catch { + case _: ViewNotExistException => + u + } + } + + private def createViewRelation(nameParts: Seq[String], view: View): LogicalPlan = { + val parsedPlan = parseViewText(nameParts.toArray.mkString("."), view.query) + + val aliases = SparkTypeUtils.fromPaimonRowType(view.rowType()).fields.zipWithIndex.map { + case (expected, pos) => + val attr = GetColumnByOrdinal(pos, expected.dataType) + Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata = + Some(expected.metadata)) + } + + SubqueryAlias(nameParts, Project(aliases, parsedPlan)) + } + + private def parseViewText(name: String, viewText: String): LogicalPlan = { + val origin = Origin( + objectType = Some("VIEW"), + objectName = Some(name) + ) + try { + CurrentOrigin.withOrigin(origin) { + try { + spark.sessionState.sqlParser.parseQuery(viewText) + } catch { + // For compatibility with Spark 3.2 and below + case _: NoSuchMethodError => + spark.sessionState.sqlParser.parsePlan(viewText) + } + } + } catch { + case _: ParseException => + throw new RuntimeException("Failed to parse view text: " + viewText) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala new file mode 100644 index 000000000..24b27bb0e --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.{PaimonBinaryCommand, PaimonUnaryCommand} + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, ShowViews, Statistics} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier} + +case class CreatePaimonView( + child: LogicalPlan, + queryText: String, + query: LogicalPlan, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], + queryColumnNames: Seq[String] = Seq.empty, + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) + extends PaimonBinaryCommand { + + override def left: LogicalPlan = child + + override def right: LogicalPlan = query + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, + newRight: LogicalPlan): LogicalPlan = + copy(child = newLeft, query = newRight) +} + +case class DropPaimonView(child: LogicalPlan, ifExists: Boolean) extends PaimonUnaryCommand { + + override protected def withNewChildInternal(newChild: LogicalPlan): DropPaimonView = + copy(child = newChild) +} + +case class ShowPaimonViews( + namespace: LogicalPlan, + pattern: Option[String], + override val output: Seq[Attribute] = ShowViews.getOutputAttrs) + extends PaimonUnaryCommand { + + override def child: LogicalPlan = namespace + + override protected def withNewChildInternal(newChild: LogicalPlan): ShowPaimonViews = + copy(namespace = newChild) +} + +/** Copy from spark 3.4+ */ +case class ResolvedIdentifier(catalog: CatalogPlugin, identifier: Identifier) extends LeafNode { + + override def output: Seq[Attribute] = Nil + + override def stats: Statistics = Statistics.DUMMY +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index 0c3d3e6b6..0c3865f7d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -19,10 +19,12 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.{SparkCatalog, SparkUtils} -import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand} +import org.apache.paimon.spark.catalog.SupportView +import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} import org.apache.spark.sql.connector.catalog.{Identifier, PaimonLookupCatalog, TableCatalog} @@ -65,6 +67,39 @@ case class PaimonStrategy(spark: SparkSession) case RenameTagCommand(PaimonCatalogAndIdentifier(catalog, ident), sourceTag, targetTag) => RenameTagExec(catalog, ident, sourceTag, targetTag) :: Nil + case CreatePaimonView( + ResolvedIdentifier(viewCatalog: SupportView, ident), + queryText, + query, + columnAliases, + columnComments, + queryColumnNames, + comment, + properties, + allowExisting, + replace) => + CreatePaimonViewExec( + viewCatalog, + ident, + queryText, + query.schema, + columnAliases, + columnComments, + queryColumnNames, + comment, + properties, + allowExisting, + replace) :: Nil + + case DropPaimonView(ResolvedIdentifier(viewCatalog: SupportView, ident), ifExists) => + DropPaimonViewExec(viewCatalog, ident, ifExists) :: Nil + + // A new member was added to ResolvedNamespace since spark4.0, + // unapply pattern matching is not used here to ensure compatibility across multiple spark versions. + case ShowPaimonViews(r: ResolvedNamespace, pattern, output) + if r.catalog.isInstanceOf[SupportView] => + ShowPaimonViewsExec(output, r.catalog.asInstanceOf[SupportView], r.namespace, pattern) :: Nil + case _ => Nil } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala new file mode 100644 index 000000000..7a4b907c7 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.catalog.SupportView +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} +import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class CreatePaimonViewExec( + catalog: SupportView, + ident: Identifier, + queryText: String, + viewSchema: StructType, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], + queryColumnNames: Seq[String], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean +) extends PaimonLeafV2CommandExec { + + override def output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + if (columnAliases.nonEmpty || columnComments.nonEmpty || queryColumnNames.nonEmpty) { + throw new UnsupportedOperationException( + "columnAliases, columnComments and queryColumnNames are not supported now") + } + + // Note: for replace just drop then create ,this operation is non-atomic. + if (replace) { + catalog.dropView(ident, true) + } + + catalog.createView( + ident, + viewSchema, + queryText, + comment.orNull, + properties.asJava, + allowExisting) + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreatePaimonViewExec: $ident" + } +} + +case class DropPaimonViewExec(catalog: SupportView, ident: Identifier, ifExists: Boolean) + extends PaimonLeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.dropView(ident, ifExists) + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropPaimonViewExec: $ident" + } +} + +case class ShowPaimonViewsExec( + output: Seq[Attribute], + catalog: SupportView, + namespace: Seq[String], + pattern: Option[String]) + extends PaimonLeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + val rows = new ArrayBuffer[InternalRow]() + catalog.listViews(namespace.toArray).asScala.map { + viewName => + if (pattern.forall(StringUtils.filterPattern(Seq(viewName), _).nonEmpty)) { + rows += new GenericInternalRow( + Array( + UTF8String.fromString(namespace.mkString(".")), + UTF8String.fromString(viewName), + false)) + } + } + rows.toSeq + } + + override def simpleString(maxFields: Int): String = { + s"ShowPaimonViewsExec: $namespace" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index 4fe217ee0..6f47a77ef 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.extensions -import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable} +import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver} import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueries} import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions import org.apache.paimon.spark.execution.PaimonStrategy @@ -37,6 +37,7 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { // analyzer extensions extensions.injectResolutionRule(spark => new PaimonAnalysis(spark)) extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark)) + extensions.injectResolutionRule(spark => PaimonViewResolver(spark)) extensions.injectResolutionRule(spark => PaimonIncompatibleResolutionRules(spark)) extensions.injectPostHocResolutionRule(spark => PaimonPostHocResolutionRules(spark)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala index 5befb88da..6ebab0384 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark -import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LeafParsedStatement} +import org.apache.spark.sql.catalyst.plans.logical.{BinaryCommand, LeafCommand, LeafParsedStatement, UnaryCommand} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec @@ -30,6 +30,9 @@ package object leafnode { trait PaimonLeafCommand extends LeafCommand - trait PaimonLeafV2CommandExec extends LeafV2CommandExec + trait PaimonUnaryCommand extends UnaryCommand + + trait PaimonBinaryCommand extends BinaryCommand + trait PaimonLeafV2CommandExec extends LeafV2CommandExec } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala index dd0a48159..9ece18693 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala @@ -23,7 +23,7 @@ import org.antlr.v4.runtime.atn.PredictionMode import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} import org.antlr.v4.runtime.tree.TerminalNodeImpl import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} @@ -61,7 +61,7 @@ class PaimonSparkSqlExtensionsParser(val delegate: ParserInterface) parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement())) .asInstanceOf[LogicalPlan] } else { - delegate.parsePlan(sqlText) + RewritePaimonViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText)) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index b864894e7..a1289a5f0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser.extensions import org.apache.paimon.spark.catalyst.plans.logical -import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions} +import org.apache.paimon.spark.catalyst.plans.logical._ import org.apache.paimon.utils.TimeUtils import org.antlr.v4.runtime._ @@ -212,5 +212,16 @@ object CurrentOrigin { def get: Origin = value.get() def set(o: Origin): Unit = value.set(o) def reset(): Unit = value.set(Origin()) + + def withOrigin[A](o: Origin)(f: => A): A = { + // remember the previous one so it can be reset to this + // way withOrigin can be recursive + val previous = get + set(o) + val ret = + try f + finally { set(previous) } + ret + } } /* Apache Spark copy end */ diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala new file mode 100644 index 000000000..f69e5d920 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser.extensions + +import org.apache.paimon.spark.catalog.SupportView +import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, DropPaimonView, ResolvedIdentifier, ShowPaimonViews} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, ResolvedNamespace, UnresolvedIdentifier} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog} + +case class RewritePaimonViewCommands(spark: SparkSession) + extends Rule[LogicalPlan] + with LookupCatalog { + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + + // A new member was added to CreatePaimonView since spark4.0, + // unapply pattern matching is not used here to ensure compatibility across multiple spark versions. + case c: CreateView => + ResolvedIdent + .unapply(c.child) + .map { + resolved => + CreatePaimonView( + child = resolved, + queryText = c.originalText.get, + query = CTESubstitution.apply(c.query), + columnAliases = c.userSpecifiedColumns.map(_._1), + columnComments = c.userSpecifiedColumns.map(_._2.orElse(None)), + comment = c.comment, + properties = c.properties, + allowExisting = c.allowExisting, + replace = c.replace + ) + } + .getOrElse(c) + + case DropView(ResolvedIdent(resolved), ifExists: Boolean) => + DropPaimonView(resolved, ifExists) + + case ShowViews(_, pattern, output) if catalogManager.currentCatalog.isInstanceOf[SupportView] => + ShowPaimonViews( + ResolvedNamespace(catalogManager.currentCatalog, catalogManager.currentNamespace), + pattern, + output) + } + + private object ResolvedIdent { + def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved match { + case UnresolvedIdentifier(CatalogAndIdentifier(viewCatalog: SupportView, ident), _) => + Some(ResolvedIdentifier(viewCatalog, ident)) + case _ => + None + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 33b993160..56922ae2a 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -241,12 +241,12 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { spark.sql(s"CREATE DATABASE paimon_db") spark.sql(s"USE paimon_db") spark.sql(s"CREATE TABLE paimon_tbl (id int, name string, dt string) using paimon") - // Currently, only spark_catalog supports create other table or view + // Only spark_catalog supports create other table if (catalogName.equals(sparkCatalogName)) { spark.sql(s"CREATE TABLE parquet_tbl (id int, name string, dt string) using parquet") spark.sql(s"CREATE VIEW parquet_tbl_view AS SELECT * FROM parquet_tbl") - spark.sql(s"CREATE VIEW paimon_tbl_view AS SELECT * FROM paimon_tbl") } + spark.sql(s"CREATE VIEW paimon_tbl_view AS SELECT * FROM paimon_tbl") spark.sql(s"USE default") spark.sql(s"DROP DATABASE paimon_db CASCADE") } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala new file mode 100644 index 000000000..39ed8e8a7 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonHiveTestBase + +import org.apache.spark.sql.Row + +abstract class PaimonViewTestBase extends PaimonHiveTestBase { + + test("Paimon View: create and drop view") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + { + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("v1") { + sql("CREATE TABLE t (id INT) USING paimon") + sql("INSERT INTO t VALUES (1), (2)") + + sql("CREATE VIEW v1 AS SELECT * FROM t") + checkAnswer(sql("SHOW VIEWS"), Seq(Row("test_db", "v1", false))) + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1), Row(2))) + checkAnswer( + sql("SELECT * FROM v1 WHERE id >= (SELECT max(id) FROM v1)"), + Seq(Row(2))) + + // test drop view + sql("DROP VIEW IF EXISTS v1") + checkAnswer(sql("SHOW VIEWS"), Seq()) + sql("CREATE VIEW v1 AS SELECT * FROM t WHERE id > 1") + checkAnswer(sql("SHOW VIEWS"), Seq(Row("test_db", "v1", false))) + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(2))) + + // test create or replace view + intercept[Exception] { + sql("CREATE VIEW v1 AS SELECT * FROM t WHERE id < 2") + } + sql("CREATE OR REPLACE VIEW v1 AS SELECT * FROM t WHERE id < 2") + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1))) + } + } + } + } + } + } + + test("Paimon View: show views") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + { + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("va", "vab", "vc") { + sql("CREATE TABLE t (id INT) USING paimon") + sql("CREATE VIEW va AS SELECT * FROM t") + sql("CREATE VIEW vab AS SELECT * FROM t") + sql("CREATE VIEW vc AS SELECT * FROM t") + checkAnswer( + sql("SHOW VIEWS"), + Seq( + Row("test_db", "va", false), + Row("test_db", "vab", false), + Row("test_db", "vc", false))) + checkAnswer( + sql("SHOW VIEWS LIKE 'va*'"), + Seq(Row("test_db", "va", false), Row("test_db", "vab", false))) + } + } + } + } + } + } +}