This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ec613701f [Spark] Supports CTAS (#2380)
ec613701f is described below
commit ec613701fb1232eaffe71d5e24464efddcac3862
Author: Yann Byron <[email protected]>
AuthorDate: Mon Nov 27 09:38:46 2023 +0800
[Spark] Supports CTAS (#2380)
---
paimon-spark/paimon-spark-3.3/pom.xml | 34 +++++------
.../shim/PaimonCreateTableAsSelectStrategy.scala | 69 ++++++++++++++++++++++
.../apache/paimon/spark/DataFrameWriteTest.scala | 53 +++++++++++++++++
.../org/apache/paimon/spark/sql/DDLTest.scala | 59 ++++++++++++++++++
.../extensions/PaimonSparkSessionExtensions.scala | 4 +-
.../execution/{datasources/v2 => }/CallExec.scala | 3 +-
...SourceV2Strategy.scala => PaimonStrategy.scala} | 12 ++--
.../v2/CallExec.scala => StrategyHelper.scala} | 26 ++++----
.../shim/PaimonCreateTableAsSelectStrategy.scala | 67 +++++++++++++++++++++
.../org/apache/paimon/spark/sql/DDLTest.scala | 59 ++++++++++++++++++
.../paimon/spark/sql/DataFrameWriteTest.scala | 30 ++++++++++
11 files changed, 380 insertions(+), 36 deletions(-)
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml
b/paimon-spark/paimon-spark-3.3/pom.xml
index d484dc88d..43e91f209 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -42,6 +42,22 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
@@ -130,24 +146,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>${scala.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>${scala.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
new file mode 100644
index 000000000..6573c13e7
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.SparkCatalog
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan, TableSpec}
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog
+import org.apache.spark.sql.execution.{SparkPlan, StrategyHelper}
+import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
+ extends Strategy
+ with StrategyHelper {
+
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case CreateTableAsSelect(
+ ResolvedDBObjectName(catalog: SparkCatalog, ident),
+ parts,
+ query,
+ tableSpec: TableSpec,
+ options,
+ ifNotExists) =>
+ catalog match {
+ case _: StagingTableCatalog =>
+ throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
+ case _ =>
+ val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+ val (coreOptions, writeOptions) = options.partition {
+ case (key, _) => coreOptionKeys.contains(key)
+ }
+ val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ coreOptions)
+ CreateTableAsSelectExec(
+ catalog.asTableCatalog,
+ ident.asIdentifier,
+ parts,
+ query,
+ planLater(query),
+ qualifyLocInTableSpec(newTableSpec),
+ new CaseInsensitiveStringMap(writeOptions.asJava),
+ ifNotExists
+ ) :: Nil
+ }
+ case _ => Nil
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
new file mode 100644
index 000000000..af6fb8ff1
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.junit.jupiter.api.Assertions
+
+class DataFrameWriteTest extends PaimonSparkTestBase {
+
+ test("Paimon: DataFrameWrite.saveAsTable") {
+
+ import testImplicits._
+
+ Seq((1L, "x1"), (2L, "x2"))
+ .toDF("a", "b")
+ .write
+ .format("paimon")
+ .mode("append")
+ .option("primary-key", "a")
+ .option("bucket", "-1")
+ .option("target-file-size", "256MB")
+ .option("write.merge-schema", "true")
+ .option("write.merge-schema.explicit-cast", "true")
+ .saveAsTable("test_ctas")
+
+ val paimonTable = loadTable("test_ctas")
+ Assertions.assertEquals(1, paimonTable.primaryKeys().size())
+ Assertions.assertEquals("a", paimonTable.primaryKeys().get(0))
+
+ // check all the core options
+ Assertions.assertEquals("-1", paimonTable.options().get("bucket"))
+ Assertions.assertEquals("256MB",
paimonTable.options().get("target-file-size"))
+
+ // non-core options should not be here.
+
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema"))
+
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast"))
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
new file mode 100644
index 000000000..7187db29d
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.junit.jupiter.api.Assertions
+
+class DDLTest extends PaimonSparkTestBase {
+
+ import testImplicits._
+
+ test("Paimon: Create Table As Select") {
+ Seq((1L, "x1", "2023"), (2L, "x2", "2023"))
+ .toDF("a", "b", "pt")
+ .createOrReplaceTempView("source")
+
+ spark.sql("""
+ |CREATE TABLE t1 AS SELECT * FROM source
+ |""".stripMargin)
+ val t1 = loadTable("t1")
+ Assertions.assertTrue(t1.primaryKeys().isEmpty)
+ Assertions.assertTrue(t1.partitionKeys().isEmpty)
+
+ spark.sql(
+ """
+ |CREATE TABLE t2
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('bucket' = '5', 'primary-key' = 'a,pt',
'target-file-size' = '128MB')
+ |AS SELECT * FROM source
+ |""".stripMargin)
+ val t2 = loadTable("t2")
+ Assertions.assertEquals(2, t2.primaryKeys().size())
+ Assertions.assertTrue(t2.primaryKeys().contains("a"))
+ Assertions.assertTrue(t2.primaryKeys().contains("pt"))
+ Assertions.assertEquals(1, t2.partitionKeys().size())
+ Assertions.assertEquals("pt", t2.partitionKeys().get(0))
+
+ // check all the core options
+ Assertions.assertEquals("5", t2.options().get("bucket"))
+ Assertions.assertEquals("128MB", t2.options().get("target-file-size"))
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index bf84b8955..dafa8fca5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.{CoerceArguments,
PaimonAnalysis, PaimonDeleteTable, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonUpdateTable, ResolveProcedures}
import
org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser
import org.apache.spark.sql.catalyst.plans.logical.PaimonTableValuedFunctions
-import
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
+import org.apache.spark.sql.execution.PaimonStrategy
/** Spark session extension to extends the syntax and adds the rules. */
class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
@@ -49,6 +49,6 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
}
// planner extensions
- extensions.injectPlannerStrategy(spark =>
ExtendedDataSourceV2Strategy(spark))
+ extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/CallExec.scala
similarity index 92%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/CallExec.scala
index 492fe6027..d97464841 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/CallExec.scala
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.v2
+package org.apache.spark.sql.execution
import org.apache.paimon.spark.procedure.Procedure
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
case class CallExec(output: Seq[Attribute], procedure: Procedure, input:
InternalRow)
extends LeafV2CommandExec {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategy.scala
similarity index 81%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategy.scala
index cde5816d8..9aa1f6428 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategy.scala
@@ -15,17 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.v2
+package org.apache.spark.sql.execution
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{CallCommand, LogicalPlan}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CallCommand,
CreateTableAsSelect, LogicalPlan, TableSpec}
+import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
-case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy
with PredicateHelper {
+case class PaimonStrategy(spark: SparkSession) extends Strategy with
PredicateHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+
+ case ctas: CreateTableAsSelect =>
+ PaimonCreateTableAsSelectStrategy(spark)(ctas)
+
case c @ CallCommand(procedure, args) =>
val input = buildInternalRow(args)
CallExec(c.output, procedure, input) :: Nil
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/StrategyHelper.scala
similarity index 54%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/StrategyHelper.scala
index 492fe6027..11f2b261f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CallExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/StrategyHelper.scala
@@ -15,22 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.v2
+package org.apache.spark.sql.execution
-import org.apache.paimon.spark.procedure.Procedure
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
+import org.apache.spark.sql.catalyst.plans.logical.TableSpec
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.util.truncatedString
+trait StrategyHelper {
-case class CallExec(output: Seq[Attribute], procedure: Procedure, input:
InternalRow)
- extends LeafV2CommandExec {
+ def spark: SparkSession
- override protected def run(): Seq[InternalRow] = {
- procedure.call(input)
+ protected def makeQualifiedDBObjectPath(location: String): String = {
+ CatalogUtils.makeQualifiedDBObjectPath(
+ spark.sharedState.conf.get(WAREHOUSE_PATH),
+ location,
+ spark.sharedState.hadoopConf)
}
- override def simpleString(maxFields: Int): String = {
- s"CallExec${truncatedString(output, "[", ", ", "]", maxFields)}
${procedure.description}"
+ protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
+ tableSpec.copy(location =
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
}
+
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
new file mode 100644
index 000000000..1457d39b9
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.SparkCatalog
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan, TableSpec}
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog
+import org.apache.spark.sql.execution.{SparkPlan, StrategyHelper}
+import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
+
+import scala.collection.JavaConverters._
+
+case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
+ extends Strategy
+ with StrategyHelper {
+
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case CreateTableAsSelect(
+ ResolvedIdentifier(catalog: SparkCatalog, ident),
+ parts,
+ query,
+ tableSpec: TableSpec,
+ options,
+ ifNotExists,
+ true) =>
+ catalog match {
+ case _: StagingTableCatalog =>
+ throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
+ case _ =>
+ val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+ val (coreOptions, writeOptions) = options.partition {
+ case (key, _) => coreOptionKeys.contains(key)
+ }
+ val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ coreOptions)
+ CreateTableAsSelectExec(
+ catalog.asTableCatalog,
+ ident,
+ parts,
+ query,
+ qualifyLocInTableSpec(newTableSpec),
+ writeOptions,
+ ifNotExists) :: Nil
+ }
+ case _ => Nil
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
new file mode 100644
index 000000000..7187db29d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.junit.jupiter.api.Assertions
+
+class DDLTest extends PaimonSparkTestBase {
+
+ import testImplicits._
+
+ test("Paimon: Create Table As Select") {
+ Seq((1L, "x1", "2023"), (2L, "x2", "2023"))
+ .toDF("a", "b", "pt")
+ .createOrReplaceTempView("source")
+
+ spark.sql("""
+ |CREATE TABLE t1 AS SELECT * FROM source
+ |""".stripMargin)
+ val t1 = loadTable("t1")
+ Assertions.assertTrue(t1.primaryKeys().isEmpty)
+ Assertions.assertTrue(t1.partitionKeys().isEmpty)
+
+ spark.sql(
+ """
+ |CREATE TABLE t2
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('bucket' = '5', 'primary-key' = 'a,pt',
'target-file-size' = '128MB')
+ |AS SELECT * FROM source
+ |""".stripMargin)
+ val t2 = loadTable("t2")
+ Assertions.assertEquals(2, t2.primaryKeys().size())
+ Assertions.assertTrue(t2.primaryKeys().contains("a"))
+ Assertions.assertTrue(t2.primaryKeys().contains("pt"))
+ Assertions.assertEquals(1, t2.partitionKeys().size())
+ Assertions.assertEquals("pt", t2.partitionKeys().get(0))
+
+ // check all the core options
+ Assertions.assertEquals("5", t2.options().get("bucket"))
+ Assertions.assertEquals("128MB", t2.options().get("target-file-size"))
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index 5e7493629..3de2bc0ec 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -20,11 +20,41 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
import java.sql.Date
class DataFrameWriteTest extends PaimonSparkTestBase {
+ test("Paimon: DataFrameWrite.saveAsTable") {
+
+ import testImplicits._
+
+ Seq((1L, "x1"), (2L, "x2"))
+ .toDF("a", "b")
+ .write
+ .format("paimon")
+ .mode("append")
+ .option("primary-key", "a")
+ .option("bucket", "-1")
+ .option("target-file-size", "256MB")
+ .option("write.merge-schema", "true")
+ .option("write.merge-schema.explicit-cast", "true")
+ .saveAsTable("test_ctas")
+
+ val paimonTable = loadTable("test_ctas")
+ Assertions.assertEquals(1, paimonTable.primaryKeys().size())
+ Assertions.assertEquals("a", paimonTable.primaryKeys().get(0))
+
+ // check all the core options
+ Assertions.assertEquals("-1", paimonTable.options().get("bucket"))
+ Assertions.assertEquals("256MB",
paimonTable.options().get("target-file-size"))
+
+ // non-core options should not be here.
+
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema"))
+
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast"))
+ }
+
withPk.foreach {
hasPk =>
bucketModes.foreach {