This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9571a1bcdd [spark] Avoid using bucket function for timestamp with 
spark unsupported precision (#5556)
9571a1bcdd is described below

commit 9571a1bcddf0ac9d6e029c8f229aad991ec1eb6e
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Apr 30 10:11:06 2025 +0800

    [spark] Avoid using bucket function for timestamp with spark unsupported 
precision (#5556)
---
 .../spark/catalog/functions/PaimonFunctions.scala  | 39 ++++++++++++++++++++
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 13 +++----
 .../apache/paimon/spark/PaimonSparkTestBase.scala  | 12 +++++++
 .../apache/paimon/spark/sql/SparkWriteITCase.scala | 42 ++++++++++++++++++++--
 .../spark/sql/TableValuedFunctionsTest.scala       | 11 ------
 5 files changed, 98 insertions(+), 19 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index d3bfb083c1..4d0368fd3c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -23,7 +23,9 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableList,
 import org.apache.paimon.spark.SparkInternalRowWrapper
 import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.KeyAndBucketExtractor.{bucket, 
bucketKeyHashCode}
+import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, 
LocalZonedTimestampType, MapType, RowType, TimestampType}
 import org.apache.paimon.utils.ProjectedRow
 
 import org.apache.spark.sql.catalyst.InternalRow
@@ -33,6 +35,8 @@ import org.apache.spark.sql.types.DataTypes.{IntegerType, 
StringType}
 
 import javax.annotation.Nullable
 
+import scala.collection.JavaConverters._
+
 object PaimonFunctions {
 
   val BUCKET: String = "bucket"
@@ -98,6 +102,41 @@ class BucketFunction extends UnboundFunction {
   override def name: String = BUCKET
 }
 
+object BucketFunction {
+
+  private val SPARK_TIMESTAMP_PRECISION = 6
+
+  def supportsTable(table: FileStoreTable): Boolean = {
+    table.bucketMode match {
+      case BucketMode.HASH_FIXED =>
+        
table.schema().logicalBucketKeyType().getFieldTypes.asScala.forall(supportsType)
+      case _ => false
+    }
+  }
+
+  /**
+   * The reason of this is that Spark's timestamp precision is fixed to 6, and 
in
+   * [[BucketFunction.bind]], we use `InternalRowSerializer(bucketKeyRowType)` 
to convert paimon
+   * rows, but the `bucketKeyRowType` is derived from Spark's StructType which 
will lose the true
+   * precision of timestamp, leading to anomalies in bucket calculations.
+   *
+   * todo: find a way get the correct paimon type in BucketFunction, then 
remove this checker
+   */
+  private def supportsType(t: PaimonDataType): Boolean = t match {
+    case arrayType: ArrayType =>
+      supportsType(arrayType.getElementType)
+    case mapType: MapType =>
+      supportsType(mapType.getKeyType) && supportsType(mapType.getValueType)
+    case rowType: RowType =>
+      rowType.getFieldTypes.asScala.forall(supportsType)
+    case timestamp: TimestampType =>
+      timestamp.getPrecision == SPARK_TIMESTAMP_PRECISION
+    case localZonedTimestamp: LocalZonedTimestampType =>
+      localZonedTimestamp.getPrecision == SPARK_TIMESTAMP_PRECISION
+    case _ => true
+  }
+}
+
 /**
  * For partitioned tables, this function returns the maximum value of the 
first level partition of
  * the partitioned table, sorted alphabetically. Note, empty partitions will 
be skipped. For
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index a2fe487bf6..831a7966f3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -30,6 +30,7 @@ import org.apache.paimon.index.{BucketAssigner, 
SimpleHashBucketAssigner}
 import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
 import org.apache.paimon.manifest.FileKind
 import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
+import org.apache.paimon.spark.catalog.functions.BucketFunction
 import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
 import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
 import org.apache.paimon.spark.util.SparkRowUtils
@@ -243,12 +244,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       case HASH_FIXED =>
         if (table.bucketSpec().getNumBuckets == -2) {
           writeWithoutBucket(data)
-        } else if (!paimonExtensionEnabled) {
-          // Topology: input -> bucket-assigner -> shuffle by partition & 
bucket
-          writeWithBucketProcessor(
-            withInitBucketCol,
-            CommonBucketProcessor(table, bucketColIdx, 
encoderGroupWithBucketCol))
-        } else {
+        } else if (paimonExtensionEnabled && 
BucketFunction.supportsTable(table)) {
           // Topology: input -> shuffle by partition & bucket
           val bucketNumber = table.coreOptions().bucket()
           val bucketKeyCol = tableSchema
@@ -262,6 +258,11 @@ case class PaimonSparkWriter(table: FileStoreTable) {
             repartitionByPartitionsAndBucket(
               data.withColumn(BUCKET_COL, 
call_udf(BucketExpression.FIXED_BUCKET, args: _*)))
           writeWithBucket(repartitioned)
+        } else {
+          // Topology: input -> bucket-assigner -> shuffle by partition & 
bucket
+          writeWithBucketProcessor(
+            withInitBucketCol,
+            CommonBucketProcessor(table, bucketColIdx, 
encoderGroupWithBucketCol))
         }
 
       case _ =>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 47f0c5a7d3..f3c2121860 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.catalog.{Catalog, Identifier}
+import org.apache.paimon.data.GenericRow
 import org.apache.paimon.fs.FileIO
 import org.apache.paimon.fs.local.LocalFileIO
 import org.apache.paimon.spark.catalog.WithPaimonCatalog
@@ -190,4 +191,15 @@ class PaimonSparkTestBase
       .scan
       .asInstanceOf[PaimonScan]
   }
+
+  object GenericRow {
+    def of(values: Any*): GenericRow = {
+      val row = new GenericRow(values.length)
+      values.zipWithIndex.foreach {
+        case (value, index) =>
+          row.setField(index, value)
+      }
+      row
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index d1cdea66cc..8bc640bf37 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -18,9 +18,10 @@
 
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.Snapshot
-import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.schema.Schema
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.DataTypes
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
@@ -261,4 +262,41 @@ class SparkWriteITCase extends PaimonSparkTestBase {
 
     }
   }
+
+  test("Paimon write: write table with timestamp3 bucket key") {
+    withTable("t") {
+      // create timestamp3 table using table api
+      val schema = Schema.newBuilder
+        .column("id", DataTypes.INT)
+        .column("ts3", DataTypes.TIMESTAMP(3))
+        .option("bucket-key", "ts3")
+        .option("bucket", "1024")
+        .option("file.format", "avro")
+        .build
+      paimonCatalog.createTable(Identifier.create(dbName0, "t"), schema, false)
+
+      // insert using table api
+      val table = loadTable("t")
+      val writeBuilder = table.newBatchWriteBuilder
+      val write = writeBuilder.newWrite
+      write.write(
+        GenericRow.of(
+          1,
+          org.apache.paimon.data.Timestamp
+            .fromSQLTimestamp(java.sql.Timestamp.valueOf("2024-01-01 
00:00:00"))))
+      val commit = writeBuilder.newCommit
+      commit.commit(write.prepareCommit())
+      commit.close()
+      write.close()
+
+      // write using spark sql
+      sql("INSERT INTO t VALUES (2, TIMESTAMP '2024-01-01 00:00:00')")
+
+      // check bucket id
+      checkAnswer(
+        sql("SELECT ts3, __paimon_bucket FROM t WHERE id = 1"),
+        sql("SELECT ts3, __paimon_bucket FROM t WHERE id = 2")
+      )
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 3c0d1a0d5d..b11f5877c5 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -367,15 +367,4 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
 
   private def utcMills(timestamp: String) =
     Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond
-
-  object GenericRow {
-    def of(values: Any*): GenericRow = {
-      val row = new GenericRow(values.length)
-      values.zipWithIndex.foreach {
-        case (value, index) =>
-          row.setField(index, value)
-      }
-      row
-    }
-  }
 }

Reply via email to