This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9571a1bcdd [spark] Avoid using bucket function for timestamp with
spark unsupported precision (#5556)
9571a1bcdd is described below
commit 9571a1bcddf0ac9d6e029c8f229aad991ec1eb6e
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Apr 30 10:11:06 2025 +0800
[spark] Avoid using bucket function for timestamp with spark unsupported
precision (#5556)
---
.../spark/catalog/functions/PaimonFunctions.scala | 39 ++++++++++++++++++++
.../paimon/spark/commands/PaimonSparkWriter.scala | 13 +++----
.../apache/paimon/spark/PaimonSparkTestBase.scala | 12 +++++++
.../apache/paimon/spark/sql/SparkWriteITCase.scala | 42 ++++++++++++++++++++--
.../spark/sql/TableValuedFunctionsTest.scala | 11 ------
5 files changed, 98 insertions(+), 19 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index d3bfb083c1..4d0368fd3c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -23,7 +23,9 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableList,
import org.apache.paimon.spark.SparkInternalRowWrapper
import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.KeyAndBucketExtractor.{bucket,
bucketKeyHashCode}
+import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType,
LocalZonedTimestampType, MapType, RowType, TimestampType}
import org.apache.paimon.utils.ProjectedRow
import org.apache.spark.sql.catalyst.InternalRow
@@ -33,6 +35,8 @@ import org.apache.spark.sql.types.DataTypes.{IntegerType,
StringType}
import javax.annotation.Nullable
+import scala.collection.JavaConverters._
+
object PaimonFunctions {
val BUCKET: String = "bucket"
@@ -98,6 +102,41 @@ class BucketFunction extends UnboundFunction {
override def name: String = BUCKET
}
+object BucketFunction {
+
+ private val SPARK_TIMESTAMP_PRECISION = 6
+
+ def supportsTable(table: FileStoreTable): Boolean = {
+ table.bucketMode match {
+ case BucketMode.HASH_FIXED =>
+
table.schema().logicalBucketKeyType().getFieldTypes.asScala.forall(supportsType)
+ case _ => false
+ }
+ }
+
+ /**
+ * The reason of this is that Spark's timestamp precision is fixed to 6, and
in
+ * [[BucketFunction.bind]], we use `InternalRowSerializer(bucketKeyRowType)`
to convert paimon
+ * rows, but the `bucketKeyRowType` is derived from Spark's StructType which
will lose the true
+ * precision of timestamp, leading to anomalies in bucket calculations.
+ *
+ * todo: find a way get the correct paimon type in BucketFunction, then
remove this checker
+ */
+ private def supportsType(t: PaimonDataType): Boolean = t match {
+ case arrayType: ArrayType =>
+ supportsType(arrayType.getElementType)
+ case mapType: MapType =>
+ supportsType(mapType.getKeyType) && supportsType(mapType.getValueType)
+ case rowType: RowType =>
+ rowType.getFieldTypes.asScala.forall(supportsType)
+ case timestamp: TimestampType =>
+ timestamp.getPrecision == SPARK_TIMESTAMP_PRECISION
+ case localZonedTimestamp: LocalZonedTimestampType =>
+ localZonedTimestamp.getPrecision == SPARK_TIMESTAMP_PRECISION
+ case _ => true
+ }
+}
+
/**
* For partitioned tables, this function returns the maximum value of the
first level partition of
* the partitioned table, sorted alphabetically. Note, empty partitions will
be skipped. For
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index a2fe487bf6..831a7966f3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -30,6 +30,7 @@ import org.apache.paimon.index.{BucketAssigner,
SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
import org.apache.paimon.manifest.FileKind
import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
+import org.apache.paimon.spark.catalog.functions.BucketFunction
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
import org.apache.paimon.spark.util.SparkRowUtils
@@ -243,12 +244,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
case HASH_FIXED =>
if (table.bucketSpec().getNumBuckets == -2) {
writeWithoutBucket(data)
- } else if (!paimonExtensionEnabled) {
- // Topology: input -> bucket-assigner -> shuffle by partition &
bucket
- writeWithBucketProcessor(
- withInitBucketCol,
- CommonBucketProcessor(table, bucketColIdx,
encoderGroupWithBucketCol))
- } else {
+ } else if (paimonExtensionEnabled &&
BucketFunction.supportsTable(table)) {
// Topology: input -> shuffle by partition & bucket
val bucketNumber = table.coreOptions().bucket()
val bucketKeyCol = tableSchema
@@ -262,6 +258,11 @@ case class PaimonSparkWriter(table: FileStoreTable) {
repartitionByPartitionsAndBucket(
data.withColumn(BUCKET_COL,
call_udf(BucketExpression.FIXED_BUCKET, args: _*)))
writeWithBucket(repartitioned)
+ } else {
+ // Topology: input -> bucket-assigner -> shuffle by partition &
bucket
+ writeWithBucketProcessor(
+ withInitBucketCol,
+ CommonBucketProcessor(table, bucketColIdx,
encoderGroupWithBucketCol))
}
case _ =>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 47f0c5a7d3..f3c2121860 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.catalog.{Catalog, Identifier}
+import org.apache.paimon.data.GenericRow
import org.apache.paimon.fs.FileIO
import org.apache.paimon.fs.local.LocalFileIO
import org.apache.paimon.spark.catalog.WithPaimonCatalog
@@ -190,4 +191,15 @@ class PaimonSparkTestBase
.scan
.asInstanceOf[PaimonScan]
}
+
+ object GenericRow {
+ def of(values: Any*): GenericRow = {
+ val row = new GenericRow(values.length)
+ values.zipWithIndex.foreach {
+ case (value, index) =>
+ row.setField(index, value)
+ }
+ row
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index d1cdea66cc..8bc640bf37 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -18,9 +18,10 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.Snapshot
-import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.schema.Schema
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.DataTypes
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
@@ -261,4 +262,41 @@ class SparkWriteITCase extends PaimonSparkTestBase {
}
}
+
+ test("Paimon write: write table with timestamp3 bucket key") {
+ withTable("t") {
+ // create timestamp3 table using table api
+ val schema = Schema.newBuilder
+ .column("id", DataTypes.INT)
+ .column("ts3", DataTypes.TIMESTAMP(3))
+ .option("bucket-key", "ts3")
+ .option("bucket", "1024")
+ .option("file.format", "avro")
+ .build
+ paimonCatalog.createTable(Identifier.create(dbName0, "t"), schema, false)
+
+ // insert using table api
+ val table = loadTable("t")
+ val writeBuilder = table.newBatchWriteBuilder
+ val write = writeBuilder.newWrite
+ write.write(
+ GenericRow.of(
+ 1,
+ org.apache.paimon.data.Timestamp
+ .fromSQLTimestamp(java.sql.Timestamp.valueOf("2024-01-01
00:00:00"))))
+ val commit = writeBuilder.newCommit
+ commit.commit(write.prepareCommit())
+ commit.close()
+ write.close()
+
+ // write using spark sql
+ sql("INSERT INTO t VALUES (2, TIMESTAMP '2024-01-01 00:00:00')")
+
+ // check bucket id
+ checkAnswer(
+ sql("SELECT ts3, __paimon_bucket FROM t WHERE id = 1"),
+ sql("SELECT ts3, __paimon_bucket FROM t WHERE id = 2")
+ )
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 3c0d1a0d5d..b11f5877c5 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -367,15 +367,4 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
private def utcMills(timestamp: String) =
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond
-
- object GenericRow {
- def of(values: Any*): GenericRow = {
- val row = new GenericRow(values.length)
- values.zipWithIndex.foreach {
- case (value, index) =>
- row.setField(index, value)
- }
- row
- }
- }
}