This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3d6a7030b [spark] Fix dynamicBucketAssignerParallelism conf (#2384)
3d6a7030b is described below
commit 3d6a7030bd64cfe5e0d339800f97f3ed62753bb6
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 24 10:32:15 2023 +0800
[spark] Fix dynamicBucketAssignerParallelism conf (#2384)
---
.../spark/commands/WriteIntoPaimonTable.scala | 11 +++--
.../paimon/spark/sql/DynamicBucketTableTest.scala | 55 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 3 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 24af34062..f9f9c3230 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -21,7 +21,7 @@ import
org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.index.PartitionIndex
import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite,
SaveMode, SparkConnectorOptions, SparkRow}
+import org.apache.paimon.spark._
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
@@ -102,7 +102,12 @@ case class WriteIntoPaimonTable(
case BucketMode.DYNAMIC =>
val partitioned = if (primaryKeyCols.nonEmpty) {
// Make sure that the records with the same bucket values is
within a task.
- withBucketCol.repartition(primaryKeyCols: _*)
+ val assignerParallelism =
table.coreOptions.dynamicBucketAssignerParallelism
+ if (assignerParallelism != null) {
+ withBucketCol.repartition(assignerParallelism, primaryKeyCols:
_*)
+ } else {
+ withBucketCol.repartition(primaryKeyCols: _*)
+ }
} else {
withBucketCol
}
@@ -230,7 +235,7 @@ object WriteIntoPaimonTable {
fileStoreTable: FileStoreTable,
rowType: RowType,
bucketColIndex: Int,
- numSparkPartitions: Long,
+ numSparkPartitions: Int,
toRow: ExpressionEncoder.Serializer[Row],
fromRow: ExpressionEncoder.Deserializer[Row]
) extends BucketProcessor {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
new file mode 100644
index 000000000..853bac387
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class DynamicBucketTableTest extends PaimonSparkTestBase {
+
+ test(s"Paimon dynamic bucket table: write with assign parallelism") {
+ spark.sql(s"""
+ |CREATE TABLE T (
+ | pk STRING,
+ | v STRING,
+ | pt STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'pk, pt',
+ | 'bucket' = '-1',
+ | 'dynamic-bucket.target-row-num'='3',
+ | 'dynamic-bucket.assigner-parallelism'='3'
+ |)
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ spark.sql(
+ "INSERT INTO T VALUES ('1', 'a', 'p'), ('2', 'b', 'p'), ('3', 'c', 'p'),
('4', 'd', 'p'), ('5', 'e', 'p')")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY pk"),
+ Row("1", "a", "p") :: Row("2", "b", "p") :: Row("3", "c", "p") ::
Row("4", "d", "p") :: Row(
+ "5",
+ "e",
+ "p") :: Nil)
+
+ checkAnswer(
+ spark.sql("SELECT DISTINCT bucket FROM `T$FILES`"),
+ Row(0) :: Row(1) :: Row(2) :: Nil)
+ }
+}