This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ec613701f [Spark] Supports CTAS (#2380)
ec613701f is described below

commit ec613701fb1232eaffe71d5e24464efddcac3862
Author: Yann Byron <[email protected]>
AuthorDate: Mon Nov 27 09:38:46 2023 +0800

    [Spark] Supports CTAS (#2380)
---
 paimon-spark/paimon-spark-3.3/pom.xml              | 34 +++++------
 .../shim/PaimonCreateTableAsSelectStrategy.scala   | 69 ++++++++++++++++++++++
 .../apache/paimon/spark/DataFrameWriteTest.scala   | 53 +++++++++++++++++
 .../org/apache/paimon/spark/sql/DDLTest.scala      | 59 ++++++++++++++++++
 .../extensions/PaimonSparkSessionExtensions.scala  |  4 +-
 .../execution/{datasources/v2 => }/CallExec.scala  |  3 +-
 ...SourceV2Strategy.scala => PaimonStrategy.scala} | 12 ++--
 .../v2/CallExec.scala => StrategyHelper.scala}     | 26 ++++----
 .../shim/PaimonCreateTableAsSelectStrategy.scala   | 67 +++++++++++++++++++++
 .../org/apache/paimon/spark/sql/DDLTest.scala      | 59 ++++++++++++++++++
 .../paimon/spark/sql/DataFrameWriteTest.scala      | 30 ++++++++++
 11 files changed, 380 insertions(+), 36 deletions(-)

diff --git a/paimon-spark/paimon-spark-3.3/pom.xml 
b/paimon-spark/paimon-spark-3.3/pom.xml
index d484dc88d..43e91f209 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -42,6 +42,22 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.12</artifactId>
@@ -130,24 +146,6 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>${scala.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-reflect</artifactId>
-            <version>${scala.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-compiler</artifactId>
-            <version>${scala.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_${scala.binary.version}</artifactId>
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
new file mode 100644
index 000000000..6573c13e7
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.SparkCatalog
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan, TableSpec}
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog
+import org.apache.spark.sql.execution.{SparkPlan, StrategyHelper}
+import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
+  extends Strategy
+  with StrategyHelper {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case CreateTableAsSelect(
+          ResolvedDBObjectName(catalog: SparkCatalog, ident),
+          parts,
+          query,
+          tableSpec: TableSpec,
+          options,
+          ifNotExists) =>
+      catalog match {
+        case _: StagingTableCatalog =>
+          throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
+        case _ =>
+          val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+          val (coreOptions, writeOptions) = options.partition {
+            case (key, _) => coreOptionKeys.contains(key)
+          }
+          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ coreOptions)
+          CreateTableAsSelectExec(
+            catalog.asTableCatalog,
+            ident.asIdentifier,
+            parts,
+            query,
+            planLater(query),
+            qualifyLocInTableSpec(newTableSpec),
+            new CaseInsensitiveStringMap(writeOptions.asJava),
+            ifNotExists
+          ) :: Nil
+      }
+    case _ => Nil
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
new file mode 100644
index 000000000..af6fb8ff1
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.junit.jupiter.api.Assertions
+
+class DataFrameWriteTest extends PaimonSparkTestBase {
+
+  test("Paimon: DataFrameWrite.saveAsTable") {
+
+    import testImplicits._
+
+    Seq((1L, "x1"), (2L, "x2"))
+      .toDF("a", "b")
+      .write
+      .format("paimon")
+      .mode("append")
+      .option("primary-key", "a")
+      .option("bucket", "-1")
+      .option("target-file-size", "256MB")
+      .option("write.merge-schema", "true")
+      .option("write.merge-schema.explicit-cast", "true")
+      .saveAsTable("test_ctas")
+
+    val paimonTable = loadTable("test_ctas")
+    Assertions.assertEquals(1, paimonTable.primaryKeys().size())
+    Assertions.assertEquals("a", paimonTable.primaryKeys().get(0))
+
+    // check all the core options
+    Assertions.assertEquals("-1", paimonTable.options().get("bucket"))
+    Assertions.assertEquals("256MB", 
paimonTable.options().get("target-file-size"))
+
+    // non-core options should not be here.
+    
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema"))
+    
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast"))
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
new file mode 100644
index 000000000..7187db29d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.junit.jupiter.api.Assertions
+
+class DDLTest extends PaimonSparkTestBase {
+
+  import testImplicits._
+
+  test("Paimon: Create Table As Select") {
+    Seq((1L, "x1", "2023"), (2L, "x2", "2023"))
+      .toDF("a", "b", "pt")
+      .createOrReplaceTempView("source")
+
+    spark.sql("""
+                |CREATE TABLE t1 AS SELECT * FROM source
+                |""".stripMargin)
+    val t1 = loadTable("t1")
+    Assertions.assertTrue(t1.primaryKeys().isEmpty)
+    Assertions.assertTrue(t1.partitionKeys().isEmpty)
+
+    spark.sql(
+      """
+        |CREATE TABLE t2
+        |PARTITIONED BY (pt)
+        |TBLPROPERTIES ('bucket' = '5', 'primary-key' = 'a,pt', 
'target-file-size' = '128MB')
+        |AS SELECT * FROM source
+        |""".stripMargin)
+    val t2 = loadTable("t2")
+    Assertions.assertEquals(2, t2.primaryKeys().size())
+    Assertions.assertTrue(t2.primaryKeys().contains("a"))
+    Assertions.assertTrue(t2.primaryKeys().contains("pt"))
+    Assertions.assertEquals(1, t2.partitionKeys().size())
+    Assertions.assertEquals("pt", t2.partitionKeys().get(0))
+
+    // check all the core options
+    Assertions.assertEquals("5", t2.options().get("bucket"))
+    Assertions.assertEquals("128MB", t2.options().get("target-file-size"))
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index bf84b8955..dafa8fca5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.SparkSessionExtensions
 import org.apache.spark.sql.catalyst.analysis.{CoerceArguments, 
PaimonAnalysis, PaimonDeleteTable, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonUpdateTable, ResolveProcedures}
 import 
org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser
 import org.apache.spark.sql.catalyst.plans.logical.PaimonTableValuedFunctions
-import 
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
+import org.apache.spark.sql.execution.PaimonStrategy
 
 /** Spark session extension to extends the syntax and adds the rules. */
 class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
@@ -49,6 +49,6 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     }
 
     // planner extensions
-    extensions.injectPlannerStrategy(spark => 
ExtendedDataSourceV2Strategy(spark))
+    extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/CallExec.scala
similarity index 92%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/CallExec.scala
index 492fe6027..d97464841 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/CallExec.scala
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.datasources.v2
+package org.apache.spark.sql.execution
 
 import org.apache.paimon.spark.procedure.Procedure
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
 
 case class CallExec(output: Seq[Attribute], procedure: Procedure, input: 
InternalRow)
   extends LeafV2CommandExec {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategy.scala
similarity index 81%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategy.scala
index cde5816d8..9aa1f6428 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategy.scala
@@ -15,17 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.datasources.v2
+package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{CallCommand, LogicalPlan}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CallCommand, 
CreateTableAsSelect, LogicalPlan, TableSpec}
+import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
 
-case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy 
with PredicateHelper {
+case class PaimonStrategy(spark: SparkSession) extends Strategy with 
PredicateHelper {
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+
+    case ctas: CreateTableAsSelect =>
+      PaimonCreateTableAsSelectStrategy(spark)(ctas)
+
     case c @ CallCommand(procedure, args) =>
       val input = buildInternalRow(args)
       CallExec(c.output, procedure, input) :: Nil
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/StrategyHelper.scala
similarity index 54%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/StrategyHelper.scala
index 492fe6027..11f2b261f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/StrategyHelper.scala
@@ -15,22 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.datasources.v2
+package org.apache.spark.sql.execution
 
-import org.apache.paimon.spark.procedure.Procedure
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
+import org.apache.spark.sql.catalyst.plans.logical.TableSpec
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.util.truncatedString
+trait StrategyHelper {
 
-case class CallExec(output: Seq[Attribute], procedure: Procedure, input: 
InternalRow)
-  extends LeafV2CommandExec {
+  def spark: SparkSession
 
-  override protected def run(): Seq[InternalRow] = {
-    procedure.call(input)
+  protected def makeQualifiedDBObjectPath(location: String): String = {
+    CatalogUtils.makeQualifiedDBObjectPath(
+      spark.sharedState.conf.get(WAREHOUSE_PATH),
+      location,
+      spark.sharedState.hadoopConf)
   }
 
-  override def simpleString(maxFields: Int): String = {
-    s"CallExec${truncatedString(output, "[", ", ", "]", maxFields)} 
${procedure.description}"
+  protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
+    tableSpec.copy(location = 
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
   }
+
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
new file mode 100644
index 000000000..1457d39b9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.SparkCatalog
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan, TableSpec}
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog
+import org.apache.spark.sql.execution.{SparkPlan, StrategyHelper}
+import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
+
+import scala.collection.JavaConverters._
+
+case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
+  extends Strategy
+  with StrategyHelper {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case CreateTableAsSelect(
+          ResolvedIdentifier(catalog: SparkCatalog, ident),
+          parts,
+          query,
+          tableSpec: TableSpec,
+          options,
+          ifNotExists,
+          true) =>
+      catalog match {
+        case _: StagingTableCatalog =>
+          throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
+        case _ =>
+          val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+          val (coreOptions, writeOptions) = options.partition {
+            case (key, _) => coreOptionKeys.contains(key)
+          }
+          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ coreOptions)
+          CreateTableAsSelectExec(
+            catalog.asTableCatalog,
+            ident,
+            parts,
+            query,
+            qualifyLocInTableSpec(newTableSpec),
+            writeOptions,
+            ifNotExists) :: Nil
+      }
+    case _ => Nil
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
new file mode 100644
index 000000000..7187db29d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.junit.jupiter.api.Assertions
+
+class DDLTest extends PaimonSparkTestBase {
+
+  import testImplicits._
+
+  test("Paimon: Create Table As Select") {
+    Seq((1L, "x1", "2023"), (2L, "x2", "2023"))
+      .toDF("a", "b", "pt")
+      .createOrReplaceTempView("source")
+
+    spark.sql("""
+                |CREATE TABLE t1 AS SELECT * FROM source
+                |""".stripMargin)
+    val t1 = loadTable("t1")
+    Assertions.assertTrue(t1.primaryKeys().isEmpty)
+    Assertions.assertTrue(t1.partitionKeys().isEmpty)
+
+    spark.sql(
+      """
+        |CREATE TABLE t2
+        |PARTITIONED BY (pt)
+        |TBLPROPERTIES ('bucket' = '5', 'primary-key' = 'a,pt', 
'target-file-size' = '128MB')
+        |AS SELECT * FROM source
+        |""".stripMargin)
+    val t2 = loadTable("t2")
+    Assertions.assertEquals(2, t2.primaryKeys().size())
+    Assertions.assertTrue(t2.primaryKeys().contains("a"))
+    Assertions.assertTrue(t2.primaryKeys().contains("pt"))
+    Assertions.assertEquals(1, t2.partitionKeys().size())
+    Assertions.assertEquals("pt", t2.partitionKeys().get(0))
+
+    // check all the core options
+    Assertions.assertEquals("5", t2.options().get("bucket"))
+    Assertions.assertEquals("128MB", t2.options().get("target-file-size"))
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index 5e7493629..3de2bc0ec 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -20,11 +20,41 @@ package org.apache.paimon.spark.sql
 import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
 
 import java.sql.Date
 
 class DataFrameWriteTest extends PaimonSparkTestBase {
 
+  test("Paimon: DataFrameWrite.saveAsTable") {
+
+    import testImplicits._
+
+    Seq((1L, "x1"), (2L, "x2"))
+      .toDF("a", "b")
+      .write
+      .format("paimon")
+      .mode("append")
+      .option("primary-key", "a")
+      .option("bucket", "-1")
+      .option("target-file-size", "256MB")
+      .option("write.merge-schema", "true")
+      .option("write.merge-schema.explicit-cast", "true")
+      .saveAsTable("test_ctas")
+
+    val paimonTable = loadTable("test_ctas")
+    Assertions.assertEquals(1, paimonTable.primaryKeys().size())
+    Assertions.assertEquals("a", paimonTable.primaryKeys().get(0))
+
+    // check all the core options
+    Assertions.assertEquals("-1", paimonTable.options().get("bucket"))
+    Assertions.assertEquals("256MB", 
paimonTable.options().get("target-file-size"))
+
+    // non-core options should not be here.
+    
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema"))
+    
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast"))
+  }
+
   withPk.foreach {
     hasPk =>
       bucketModes.foreach {

Reply via email to