This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d7f66a0 [spark] Introduce View Support to SparkCatalog (#4538)
19d7f66a0 is described below

commit 19d7f66a006d02ee3ad233937fc6116a07acefbb
Author: Zouxxyy <zouxinyu....@alibaba-inc.com>
AuthorDate: Tue Nov 19 11:02:16 2024 +0800

    [spark] Introduce View Support to SparkCatalog (#4538)
---
 .../extensions/RewritePaimonViewCommands.scala     |  80 ++++++++++++++
 .../apache/paimon/spark/sql/PaimonViewTest.scala}  |  18 +---
 .../extensions/RewritePaimonViewCommands.scala     |  79 ++++++++++++++
 .../apache/paimon/spark/sql/PaimonViewTest.scala}  |  18 +---
 .../apache/paimon/spark/sql/PaimonViewTest.scala}  |  18 +---
 .../apache/paimon/spark/sql/PaimonViewTest.scala}  |  18 +---
 .../apache/paimon/spark/sql/PaimonViewTest.scala}  |  18 +---
 .../java/org/apache/paimon/spark/SparkCatalog.java |  49 ++-------
 .../org/apache/paimon/spark/SparkTypeUtils.java    |   4 +
 .../apache/paimon/spark/catalog/SupportView.java   |  86 +++++++++++++++
 .../apache/paimon/spark/utils/CatalogUtils.java}   |  26 +++--
 .../catalyst/analysis/PaimonViewResolver.scala     |  85 +++++++++++++++
 .../catalyst/plans/logical/PaimonViewCommand.scala |  74 +++++++++++++
 .../paimon/spark/execution/PaimonStrategy.scala    |  37 ++++++-
 .../paimon/spark/execution/PaimonViewExec.scala    | 117 +++++++++++++++++++++
 .../extensions/PaimonSparkSessionExtensions.scala  |   3 +-
 .../org/apache/paimon/spark/leafnode/package.scala |   7 +-
 .../PaimonSparkSqlExtensionsParser.scala           |   4 +-
 .../extensions/PaimonSqlExtensionsAstBuilder.scala |  13 ++-
 .../extensions/RewritePaimonViewCommands.scala     |  77 ++++++++++++++
 .../spark/sql/DDLWithHiveCatalogTestBase.scala     |   4 +-
 .../paimon/spark/sql/PaimonViewTestBase.scala      |  96 +++++++++++++++++
 22 files changed, 794 insertions(+), 137 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
new file mode 100644
index 000000000..e759edd0c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, 
DropPaimonView, ResolvedIdentifier, ShowPaimonViews}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, 
ResolvedNamespace, UnresolvedView}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewritePaimonViewCommands(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with LookupCatalog {
+
+  protected lazy val catalogManager: CatalogManager = 
spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
+
+    case CreateViewStatement(
+          ResolvedIdent(resolved),
+          userSpecifiedColumns,
+          comment,
+          properties,
+          Some(originalText),
+          child,
+          allowExisting,
+          replace,
+          _) =>
+      CreatePaimonView(
+        child = resolved,
+        queryText = originalText,
+        query = CTESubstitution.apply(child),
+        columnAliases = userSpecifiedColumns.map(_._1),
+        columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
+        comment = comment,
+        properties = properties,
+        allowExisting = allowExisting,
+        replace = replace
+      )
+
+    case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
+      DropPaimonView(resolved, ifExists)
+
+    case ShowViews(_, pattern, output) if 
catalogManager.currentCatalog.isInstanceOf[SupportView] =>
+      ShowPaimonViews(
+        ResolvedNamespace(catalogManager.currentCatalog, 
catalogManager.currentNamespace),
+        pattern,
+        output)
+  }
+
+  private object ResolvedIdent {
+    def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved 
match {
+      case CatalogAndIdentifier(viewCatalog: SupportView, ident) =>
+        Some(ResolvedIdentifier(viewCatalog, ident))
+      case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView, 
ident), _, _, _) =>
+        Some(ResolvedIdentifier(viewCatalog, ident))
+      case _ =>
+        None
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to 
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, 
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
-  trait PaimonLeafParsedStatement extends LeafParsedStatement
-
-  trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
-  trait PaimonLeafCommand extends LeafCommand
-
-  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
new file mode 100644
index 000000000..5d57cda2f
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, 
DropPaimonView, ResolvedIdentifier, ShowPaimonViews}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, 
ResolvedNamespace, UnresolvedDBObjectName, UnresolvedView}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewritePaimonViewCommands(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with LookupCatalog {
+
+  protected lazy val catalogManager: CatalogManager = 
spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
+
+    case CreateView(
+          ResolvedIdent(resolved),
+          userSpecifiedColumns,
+          comment,
+          properties,
+          Some(queryText),
+          query,
+          allowExisting,
+          replace) =>
+      CreatePaimonView(
+        child = resolved,
+        queryText = queryText,
+        query = CTESubstitution.apply(query),
+        columnAliases = userSpecifiedColumns.map(_._1),
+        columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
+        comment = comment,
+        properties = properties,
+        allowExisting = allowExisting,
+        replace = replace
+      )
+
+    case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
+      DropPaimonView(resolved, ifExists)
+
+    case ShowViews(_, pattern, output) if 
catalogManager.currentCatalog.isInstanceOf[SupportView] =>
+      ShowPaimonViews(
+        ResolvedNamespace(catalogManager.currentCatalog, 
catalogManager.currentNamespace),
+        pattern,
+        output)
+  }
+
+  private object ResolvedIdent {
+    def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved 
match {
+      case UnresolvedDBObjectName(CatalogAndIdentifier(viewCatalog: 
SupportView, ident), _) =>
+        Some(ResolvedIdentifier(viewCatalog, ident))
+      case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView, 
ident), _, _, _) =>
+        Some(ResolvedIdentifier(viewCatalog, ident))
+      case _ =>
+        None
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to 
paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, 
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
-  trait PaimonLeafParsedStatement extends LeafParsedStatement
-
-  trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
-  trait PaimonLeafCommand extends LeafCommand
-
-  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to 
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, 
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
-  trait PaimonLeafParsedStatement extends LeafParsedStatement
-
-  trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
-  trait PaimonLeafCommand extends LeafCommand
-
-  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to 
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, 
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
-  trait PaimonLeafParsedStatement extends LeafParsedStatement
-
-  trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
-  trait PaimonLeafCommand extends LeafCommand
-
-  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to 
paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, 
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
-  trait PaimonLeafParsedStatement extends LeafParsedStatement
-
-  trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
-  trait PaimonLeafCommand extends LeafCommand
-
-  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 89448c1f4..3b9af1694 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -27,6 +27,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.spark.catalog.SparkBaseCatalog;
 import org.apache.paimon.spark.catalog.SupportFunction;
+import org.apache.paimon.spark.catalog.SupportView;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.FormatTableOptions;
 
@@ -72,10 +73,12 @@ import static 
org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
 import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
+import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
+import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog extends SparkBaseCatalog implements SupportFunction {
+public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, 
SupportView {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
 
@@ -126,10 +129,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
     @Override
     public void createNamespace(String[] namespace, Map<String, String> 
metadata)
             throws NamespaceAlreadyExistsException {
-        checkArgument(
-                isValidateNamespace(namespace),
-                "Namespace %s is not valid",
-                Arrays.toString(namespace));
+        checkNamespace(namespace);
         try {
             catalog.createDatabase(namespace[0], false, metadata);
         } catch (Catalog.DatabaseAlreadyExistException e) {
@@ -152,9 +152,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         if (namespace.length == 0) {
             return listNamespaces();
         }
-        if (!isValidateNamespace(namespace)) {
-            throw new NoSuchNamespaceException(namespace);
-        }
+        checkNamespace(namespace);
         try {
             catalog.getDatabase(namespace[0]);
             return new String[0][];
@@ -166,10 +164,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
     @Override
     public Map<String, String> loadNamespaceMetadata(String[] namespace)
             throws NoSuchNamespaceException {
-        checkArgument(
-                isValidateNamespace(namespace),
-                "Namespace %s is not valid",
-                Arrays.toString(namespace));
+        checkNamespace(namespace);
         String dataBaseName = namespace[0];
         try {
             return catalog.getDatabase(dataBaseName).options();
@@ -207,10 +202,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
      */
     public boolean dropNamespace(String[] namespace, boolean cascade)
             throws NoSuchNamespaceException {
-        checkArgument(
-                isValidateNamespace(namespace),
-                "Namespace %s is not valid",
-                Arrays.toString(namespace));
+        checkNamespace(namespace);
         try {
             catalog.dropDatabase(namespace[0], false, cascade);
             return true;
@@ -224,10 +216,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
 
     @Override
     public Identifier[] listTables(String[] namespace) throws 
NoSuchNamespaceException {
-        checkArgument(
-                isValidateNamespace(namespace),
-                "Missing database in namespace: %s",
-                Arrays.toString(namespace));
+        checkNamespace(namespace);
         try {
             return catalog.listTables(namespace[0]).stream()
                     .map(table -> Identifier.of(namespace, table))
@@ -239,10 +228,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
 
     @Override
     public void invalidateTable(Identifier ident) {
-        try {
-            catalog.invalidateTable(toIdentifier(ident));
-        } catch (NoSuchTableException ignored) {
-        }
+        catalog.invalidateTable(toIdentifier(ident));
     }
 
     @Override
@@ -347,7 +333,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         try {
             catalog.dropTable(toIdentifier(ident), false);
             return true;
-        } catch (Catalog.TableNotExistException | NoSuchTableException e) {
+        } catch (Catalog.TableNotExistException e) {
             return false;
         }
     }
@@ -454,10 +440,6 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         }
     }
 
-    private boolean isValidateNamespace(String[] namespace) {
-        return namespace.length == 1;
-    }
-
     @Override
     public void renameTable(Identifier oldIdent, Identifier newIdent)
             throws NoSuchTableException, TableAlreadyExistsException {
@@ -472,15 +454,6 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
 
     // --------------------- tools ------------------------------------------
 
-    protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier 
ident)
-            throws NoSuchTableException {
-        if (!isValidateNamespace(ident.namespace())) {
-            throw new NoSuchTableException(ident);
-        }
-
-        return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], 
ident.name());
-    }
-
     protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
             Identifier ident, Map<String, String> extraOptions) throws 
NoSuchTableException {
         try {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index 8bba67620..f6643f758 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -81,6 +81,10 @@ public class SparkTypeUtils {
         return type.accept(PaimonToSparkTypeVisitor.INSTANCE);
     }
 
+    public static org.apache.paimon.types.RowType toPaimonRowType(StructType 
type) {
+        return (RowType) toPaimonType(type);
+    }
+
     public static org.apache.paimon.types.DataType toPaimonType(DataType 
dataType) {
         return SparkToPaimonTypeVisitor.visit(dataType);
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
new file mode 100644
index 000000000..b8ce86e89
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.spark.SparkTypeUtils;
+import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewImpl;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
+import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
+
+/** Catalog methods for working with Views. */
+public interface SupportView extends WithPaimonCatalog {
+
+    default List<String> listViews(String[] namespace) throws 
NoSuchNamespaceException {
+        try {
+            checkNamespace(namespace);
+            return paimonCatalog().listViews(namespace[0]);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(namespace);
+        }
+    }
+
+    default View loadView(Identifier ident) throws 
Catalog.ViewNotExistException {
+        return paimonCatalog().getView(toIdentifier(ident));
+    }
+
+    default void createView(
+            Identifier ident,
+            StructType schema,
+            String queryText,
+            String comment,
+            Map<String, String> properties,
+            Boolean ignoreIfExists)
+            throws NoSuchNamespaceException {
+        org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident);
+        try {
+            paimonCatalog()
+                    .createView(
+                            paimonIdent,
+                            new ViewImpl(
+                                    paimonIdent,
+                                    SparkTypeUtils.toPaimonRowType(schema),
+                                    queryText,
+                                    comment,
+                                    properties),
+                            ignoreIfExists);
+        } catch (Catalog.ViewAlreadyExistException e) {
+            throw new RuntimeException("view already exists: " + ident, e);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(ident.namespace());
+        }
+    }
+
+    default void dropView(Identifier ident, Boolean ignoreIfExists) {
+        try {
+            paimonCatalog().dropView(toIdentifier(ident), ignoreIfExists);
+        } catch (Catalog.ViewNotExistException e) {
+            throw new RuntimeException("view not exists: " + ident, e);
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
similarity index 53%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
index 5befb88da..fca9df210 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
@@ -16,20 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.utils;
 
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, 
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.connector.catalog.Identifier;
 
-package object leafnode {
+import java.util.Arrays;
 
-  trait PaimonLeafParsedStatement extends LeafParsedStatement
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-  trait PaimonLeafRunnableCommand extends LeafRunnableCommand
+/** Utils of catalog. */
+public class CatalogUtils {
 
-  trait PaimonLeafCommand extends LeafCommand
-
-  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
+    public static void checkNamespace(String[] namespace) {
+        checkArgument(
+                namespace.length == 1,
+                "Paimon only support single namespace, but got %s",
+                Arrays.toString(namespace));
+    }
 
+    public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier 
ident) {
+        checkNamespace(ident.namespace());
+        return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], 
ident.name());
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
new file mode 100644
index 000000000..a375a2965
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.catalog.Catalog.ViewNotExistException
+import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.view.View
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, UpCast}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.parser.extensions.{CurrentOrigin, Origin}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+
+case class PaimonViewResolver(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with PaimonLookupCatalog {
+
+  protected lazy val catalogManager = spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    case u @ UnresolvedRelation(parts @ CatalogAndIdentifier(catalog: 
SupportView, ident), _, _) =>
+      try {
+        val view = catalog.loadView(ident)
+        createViewRelation(parts, view)
+      } catch {
+        case _: ViewNotExistException =>
+          u
+      }
+  }
+
+  private def createViewRelation(nameParts: Seq[String], view: View): 
LogicalPlan = {
+    val parsedPlan = parseViewText(nameParts.toArray.mkString("."), view.query)
+
+    val aliases = 
SparkTypeUtils.fromPaimonRowType(view.rowType()).fields.zipWithIndex.map {
+      case (expected, pos) =>
+        val attr = GetColumnByOrdinal(pos, expected.dataType)
+        Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata 
=
+          Some(expected.metadata))
+    }
+
+    SubqueryAlias(nameParts, Project(aliases, parsedPlan))
+  }
+
+  private def parseViewText(name: String, viewText: String): LogicalPlan = {
+    val origin = Origin(
+      objectType = Some("VIEW"),
+      objectName = Some(name)
+    )
+    try {
+      CurrentOrigin.withOrigin(origin) {
+        try {
+          spark.sessionState.sqlParser.parseQuery(viewText)
+        } catch {
+          // For compatibility with Spark 3.2 and below
+          case _: NoSuchMethodError =>
+            spark.sessionState.sqlParser.parsePlan(viewText)
+        }
+      }
+    } catch {
+      case _: ParseException =>
+        throw new RuntimeException("Failed to parse view text: " + viewText)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala
new file mode 100644
index 000000000..24b27bb0e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.leafnode.{PaimonBinaryCommand, 
PaimonUnaryCommand}
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
ShowViews, Statistics}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
+
+case class CreatePaimonView(
+    child: LogicalPlan,
+    queryText: String,
+    query: LogicalPlan,
+    columnAliases: Seq[String],
+    columnComments: Seq[Option[String]],
+    queryColumnNames: Seq[String] = Seq.empty,
+    comment: Option[String],
+    properties: Map[String, String],
+    allowExisting: Boolean,
+    replace: Boolean)
+  extends PaimonBinaryCommand {
+
+  override def left: LogicalPlan = child
+
+  override def right: LogicalPlan = query
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan,
+      newRight: LogicalPlan): LogicalPlan =
+    copy(child = newLeft, query = newRight)
+}
+
+case class DropPaimonView(child: LogicalPlan, ifExists: Boolean) extends 
PaimonUnaryCommand {
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
DropPaimonView =
+    copy(child = newChild)
+}
+
+case class ShowPaimonViews(
+    namespace: LogicalPlan,
+    pattern: Option[String],
+    override val output: Seq[Attribute] = ShowViews.getOutputAttrs)
+  extends PaimonUnaryCommand {
+
+  override def child: LogicalPlan = namespace
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
ShowPaimonViews =
+    copy(namespace = newChild)
+}
+
+/** Copy from spark 3.4+ */
+case class ResolvedIdentifier(catalog: CatalogPlugin, identifier: Identifier) 
extends LeafNode {
+
+  override def output: Seq[Attribute] = Nil
+
+  override def stats: Statistics = Statistics.DUMMY
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 0c3d3e6b6..0c3865f7d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -19,10 +19,12 @@
 package org.apache.paimon.spark.execution
 
 import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
-import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
+import org.apache.paimon.spark.catalog.SupportView
+import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, 
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
 
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
PaimonLookupCatalog, TableCatalog}
@@ -65,6 +67,39 @@ case class PaimonStrategy(spark: SparkSession)
     case RenameTagCommand(PaimonCatalogAndIdentifier(catalog, ident), 
sourceTag, targetTag) =>
       RenameTagExec(catalog, ident, sourceTag, targetTag) :: Nil
 
+    case CreatePaimonView(
+          ResolvedIdentifier(viewCatalog: SupportView, ident),
+          queryText,
+          query,
+          columnAliases,
+          columnComments,
+          queryColumnNames,
+          comment,
+          properties,
+          allowExisting,
+          replace) =>
+      CreatePaimonViewExec(
+        viewCatalog,
+        ident,
+        queryText,
+        query.schema,
+        columnAliases,
+        columnComments,
+        queryColumnNames,
+        comment,
+        properties,
+        allowExisting,
+        replace) :: Nil
+
+    case DropPaimonView(ResolvedIdentifier(viewCatalog: SupportView, ident), 
ifExists) =>
+      DropPaimonViewExec(viewCatalog, ident, ifExists) :: Nil
+
+    // A new member was added to ResolvedNamespace since spark4.0,
+    // unapply pattern matching is not used here to ensure compatibility 
across multiple spark versions.
+    case ShowPaimonViews(r: ResolvedNamespace, pattern, output)
+        if r.catalog.isInstanceOf[SupportView] =>
+      ShowPaimonViewsExec(output, r.catalog.asInstanceOf[SupportView], 
r.namespace, pattern) :: Nil
+
     case _ => Nil
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
new file mode 100644
index 000000000..7a4b907c7
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow}
+import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class CreatePaimonViewExec(
+    catalog: SupportView,
+    ident: Identifier,
+    queryText: String,
+    viewSchema: StructType,
+    columnAliases: Seq[String],
+    columnComments: Seq[Option[String]],
+    queryColumnNames: Seq[String],
+    comment: Option[String],
+    properties: Map[String, String],
+    allowExisting: Boolean,
+    replace: Boolean
+) extends PaimonLeafV2CommandExec {
+
+  override def output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    if (columnAliases.nonEmpty || columnComments.nonEmpty || 
queryColumnNames.nonEmpty) {
+      throw new UnsupportedOperationException(
+        "columnAliases, columnComments and queryColumnNames are not supported 
now")
+    }
+
+    // Note: for replace just drop then create ,this operation is non-atomic.
+    if (replace) {
+      catalog.dropView(ident, true)
+    }
+
+    catalog.createView(
+      ident,
+      viewSchema,
+      queryText,
+      comment.orNull,
+      properties.asJava,
+      allowExisting)
+
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"CreatePaimonViewExec: $ident"
+  }
+}
+
+case class DropPaimonViewExec(catalog: SupportView, ident: Identifier, 
ifExists: Boolean)
+  extends PaimonLeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    catalog.dropView(ident, ifExists)
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"DropPaimonViewExec: $ident"
+  }
+}
+
+case class ShowPaimonViewsExec(
+    output: Seq[Attribute],
+    catalog: SupportView,
+    namespace: Seq[String],
+    pattern: Option[String])
+  extends PaimonLeafV2CommandExec {
+
+  override protected def run(): Seq[InternalRow] = {
+    val rows = new ArrayBuffer[InternalRow]()
+    catalog.listViews(namespace.toArray).asScala.map {
+      viewName =>
+        if (pattern.forall(StringUtils.filterPattern(Seq(viewName), 
_).nonEmpty)) {
+          rows += new GenericInternalRow(
+            Array(
+              UTF8String.fromString(namespace.mkString(".")),
+              UTF8String.fromString(viewName),
+              false))
+        }
+    }
+    rows.toSeq
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"ShowPaimonViewsExec: $namespace"
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 4fe217ee0..6f47a77ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.extensions
 
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, 
PaimonViewResolver}
 import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueries}
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
 import org.apache.paimon.spark.execution.PaimonStrategy
@@ -37,6 +37,7 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     // analyzer extensions
     extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
     extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
+    extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
     extensions.injectResolutionRule(spark => 
PaimonIncompatibleResolutionRules(spark))
 
     extensions.injectPostHocResolutionRule(spark => 
PaimonPostHocResolutionRules(spark))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
index 5befb88da..6ebab0384 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, 
LeafParsedStatement}
+import org.apache.spark.sql.catalyst.plans.logical.{BinaryCommand, 
LeafCommand, LeafParsedStatement, UnaryCommand}
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
 
@@ -30,6 +30,9 @@ package object leafnode {
 
   trait PaimonLeafCommand extends LeafCommand
 
-  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
+  trait PaimonUnaryCommand extends UnaryCommand
+
+  trait PaimonBinaryCommand extends BinaryCommand
 
+  trait PaimonLeafV2CommandExec extends LeafV2CommandExec
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
index dd0a48159..9ece18693 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
@@ -23,7 +23,7 @@ import org.antlr.v4.runtime.atn.PredictionMode
 import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
 import org.antlr.v4.runtime.tree.TerminalNodeImpl
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
@@ -61,7 +61,7 @@ class PaimonSparkSqlExtensionsParser(val delegate: 
ParserInterface)
       parse(sqlTextAfterSubstitution)(parser => 
astBuilder.visit(parser.singleStatement()))
         .asInstanceOf[LogicalPlan]
     } else {
-      delegate.parsePlan(sqlText)
+      
RewritePaimonViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText))
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index b864894e7..a1289a5f0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.catalyst.parser.extensions
 
 import org.apache.paimon.spark.catalyst.plans.logical
-import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, 
PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
+import org.apache.paimon.spark.catalyst.plans.logical._
 import org.apache.paimon.utils.TimeUtils
 
 import org.antlr.v4.runtime._
@@ -212,5 +212,16 @@ object CurrentOrigin {
   def get: Origin = value.get()
   def set(o: Origin): Unit = value.set(o)
   def reset(): Unit = value.set(Origin())
+
+  def withOrigin[A](o: Origin)(f: => A): A = {
+    // remember the previous one so it can be reset to this
+    // way withOrigin can be recursive
+    val previous = get
+    set(o)
+    val ret =
+      try f
+      finally { set(previous) }
+    ret
+  }
 }
 /* Apache Spark copy end */
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
new file mode 100644
index 000000000..f69e5d920
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, 
DropPaimonView, ResolvedIdentifier, ShowPaimonViews}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, 
ResolvedNamespace, UnresolvedIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewritePaimonViewCommands(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with LookupCatalog {
+
+  protected lazy val catalogManager: CatalogManager = 
spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
+
+    // A new member was added to CreatePaimonView since spark4.0,
+    // unapply pattern matching is not used here to ensure compatibility 
across multiple spark versions.
+    case c: CreateView =>
+      ResolvedIdent
+        .unapply(c.child)
+        .map {
+          resolved =>
+            CreatePaimonView(
+              child = resolved,
+              queryText = c.originalText.get,
+              query = CTESubstitution.apply(c.query),
+              columnAliases = c.userSpecifiedColumns.map(_._1),
+              columnComments = c.userSpecifiedColumns.map(_._2.orElse(None)),
+              comment = c.comment,
+              properties = c.properties,
+              allowExisting = c.allowExisting,
+              replace = c.replace
+            )
+        }
+        .getOrElse(c)
+
+    case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
+      DropPaimonView(resolved, ifExists)
+
+    case ShowViews(_, pattern, output) if 
catalogManager.currentCatalog.isInstanceOf[SupportView] =>
+      ShowPaimonViews(
+        ResolvedNamespace(catalogManager.currentCatalog, 
catalogManager.currentNamespace),
+        pattern,
+        output)
+  }
+
+  private object ResolvedIdent {
+    def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved 
match {
+      case UnresolvedIdentifier(CatalogAndIdentifier(viewCatalog: SupportView, 
ident), _) =>
+        Some(ResolvedIdentifier(viewCatalog, ident))
+      case _ =>
+        None
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index 33b993160..56922ae2a 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -241,12 +241,12 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
           spark.sql(s"CREATE DATABASE paimon_db")
           spark.sql(s"USE paimon_db")
           spark.sql(s"CREATE TABLE paimon_tbl (id int, name string, dt string) 
using paimon")
-          // Currently, only spark_catalog supports create other table or view
+          // Only spark_catalog supports create other table
           if (catalogName.equals(sparkCatalogName)) {
             spark.sql(s"CREATE TABLE parquet_tbl (id int, name string, dt 
string) using parquet")
             spark.sql(s"CREATE VIEW parquet_tbl_view AS SELECT * FROM 
parquet_tbl")
-            spark.sql(s"CREATE VIEW paimon_tbl_view AS SELECT * FROM 
paimon_tbl")
           }
+          spark.sql(s"CREATE VIEW paimon_tbl_view AS SELECT * FROM paimon_tbl")
           spark.sql(s"USE default")
           spark.sql(s"DROP DATABASE paimon_db CASCADE")
       }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala
new file mode 100644
index 000000000..39ed8e8a7
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonHiveTestBase
+
+import org.apache.spark.sql.Row
+
+abstract class PaimonViewTestBase extends PaimonHiveTestBase {
+
+  test("Paimon View: create and drop view") {
+    Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+      catalogName =>
+        {
+          sql(s"USE $catalogName")
+          withDatabase("test_db") {
+            sql("CREATE DATABASE test_db")
+            sql("USE test_db")
+            withTable("t") {
+              withView("v1") {
+                sql("CREATE TABLE t (id INT) USING paimon")
+                sql("INSERT INTO t VALUES (1), (2)")
+
+                sql("CREATE VIEW v1 AS SELECT * FROM t")
+                checkAnswer(sql("SHOW VIEWS"), Seq(Row("test_db", "v1", 
false)))
+                checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1), Row(2)))
+                checkAnswer(
+                  sql("SELECT * FROM v1 WHERE id >= (SELECT max(id) FROM v1)"),
+                  Seq(Row(2)))
+
+                // test drop view
+                sql("DROP VIEW IF EXISTS v1")
+                checkAnswer(sql("SHOW VIEWS"), Seq())
+                sql("CREATE VIEW v1 AS SELECT * FROM t WHERE id > 1")
+                checkAnswer(sql("SHOW VIEWS"), Seq(Row("test_db", "v1", 
false)))
+                checkAnswer(sql("SELECT * FROM v1"), Seq(Row(2)))
+
+                // test create or replace view
+                intercept[Exception] {
+                  sql("CREATE VIEW v1 AS SELECT * FROM t WHERE id < 2")
+                }
+                sql("CREATE OR REPLACE VIEW v1 AS SELECT * FROM t WHERE id < 
2")
+                checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1)))
+              }
+            }
+          }
+        }
+    }
+  }
+
+  test("Paimon View: show views") {
+    Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+      catalogName =>
+        {
+          sql(s"USE $catalogName")
+          withDatabase("test_db") {
+            sql("CREATE DATABASE test_db")
+            sql("USE test_db")
+            withTable("t") {
+              withView("va", "vab", "vc") {
+                sql("CREATE TABLE t (id INT) USING paimon")
+                sql("CREATE VIEW va AS SELECT * FROM t")
+                sql("CREATE VIEW vab AS SELECT * FROM t")
+                sql("CREATE VIEW vc AS SELECT * FROM t")
+                checkAnswer(
+                  sql("SHOW VIEWS"),
+                  Seq(
+                    Row("test_db", "va", false),
+                    Row("test_db", "vab", false),
+                    Row("test_db", "vc", false)))
+                checkAnswer(
+                  sql("SHOW VIEWS LIKE 'va*'"),
+                  Seq(Row("test_db", "va", false), Row("test_db", "vab", 
false)))
+              }
+            }
+          }
+        }
+    }
+  }
+}

Reply via email to