This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6a7030b [spark] Fix dynamicBucketAssignerParallelism conf (#2384)
3d6a7030b is described below

commit 3d6a7030bd64cfe5e0d339800f97f3ed62753bb6
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 24 10:32:15 2023 +0800

    [spark] Fix dynamicBucketAssignerParallelism conf (#2384)
---
 .../spark/commands/WriteIntoPaimonTable.scala      | 11 +++--
 .../paimon/spark/sql/DynamicBucketTableTest.scala  | 55 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 3 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 24af34062..f9f9c3230 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -21,7 +21,7 @@ import 
org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.index.PartitionIndex
 import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, 
SaveMode, SparkConnectorOptions, SparkRow}
+import org.apache.paimon.spark._
 import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.spark.schema.SparkSystemColumns
 import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
@@ -102,7 +102,12 @@ case class WriteIntoPaimonTable(
         case BucketMode.DYNAMIC =>
           val partitioned = if (primaryKeyCols.nonEmpty) {
             // Make sure that the records with the same bucket values is 
within a task.
-            withBucketCol.repartition(primaryKeyCols: _*)
+            val assignerParallelism = 
table.coreOptions.dynamicBucketAssignerParallelism
+            if (assignerParallelism != null) {
+              withBucketCol.repartition(assignerParallelism, primaryKeyCols: 
_*)
+            } else {
+              withBucketCol.repartition(primaryKeyCols: _*)
+            }
           } else {
             withBucketCol
           }
@@ -230,7 +235,7 @@ object WriteIntoPaimonTable {
       fileStoreTable: FileStoreTable,
       rowType: RowType,
       bucketColIndex: Int,
-      numSparkPartitions: Long,
+      numSparkPartitions: Int,
       toRow: ExpressionEncoder.Serializer[Row],
       fromRow: ExpressionEncoder.Deserializer[Row]
   ) extends BucketProcessor {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
new file mode 100644
index 000000000..853bac387
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class DynamicBucketTableTest extends PaimonSparkTestBase {
+
+  test(s"Paimon dynamic bucket table: write with assign parallelism") {
+    spark.sql(s"""
+                 |CREATE TABLE T (
+                 |  pk STRING,
+                 |  v STRING,
+                 |  pt STRING)
+                 |TBLPROPERTIES (
+                 |  'primary-key' = 'pk, pt',
+                 |  'bucket' = '-1',
+                 |  'dynamic-bucket.target-row-num'='3',
+                 |  'dynamic-bucket.assigner-parallelism'='3'
+                 |)
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    spark.sql(
+      "INSERT INTO T VALUES ('1', 'a', 'p'), ('2', 'b', 'p'), ('3', 'c', 'p'), 
('4', 'd', 'p'), ('5', 'e', 'p')")
+
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY pk"),
+      Row("1", "a", "p") :: Row("2", "b", "p") :: Row("3", "c", "p") :: 
Row("4", "d", "p") :: Row(
+        "5",
+        "e",
+        "p") :: Nil)
+
+    checkAnswer(
+      spark.sql("SELECT DISTINCT bucket FROM `T$FILES`"),
+      Row(0) :: Row(1) :: Row(2) :: Nil)
+  }
+}

Reply via email to