This is an automated email from the ASF dual-hosted git repository.

MaxGekk pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 6c65939f3f7e [SPARK-57103][SQL] Add hashing for nanosecond timestamp 
types
6c65939f3f7e is described below

commit 6c65939f3f7eadc8b78df25b7ebc1eb38a7df283
Author: Stevo Mitric <[email protected]>
AuthorDate: Tue Jun 2 10:13:08 2026 +0200

    [SPARK-57103][SQL] Add hashing for nanosecond timestamp types
    
    ### What changes were proposed in this pull request?
    Add hashing support for the nanosecond timestamp types 
`TimestampNTZNanosType(p)` and `TimestampLTZNanosType(p)`, in both the 
interpreted and codegen paths of hash.scala:
    
    - Murmur3Hash / XxHash64: mix both fields, following the existing 
CalendarInterval pattern - `hashInt(nanosWithinMicro,
    hashLong(epochMicros, seed))`.
    - HiveHash: a dedicated `hashTimestampNanos` that extends the existing 
`hashTimestamp` with the sub-microsecond nanoseconds using the same * 37 + 
field idiom as `hashCalendarInterval`.
    
    ### Why are the changes needed?
    hash-based GROUP BY / DISTINCT / joins - failed on nanosecond timestamp 
columns.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Claude Opus 4.7
    
    Closes #56203 from stevomitric/stevomitric/SPARK-57103-hash.
    
    Authored-by: Stevo Mitric <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
    (cherry picked from commit 76c26a60358081e49c7730fb18e63effe86176fd)
    Signed-off-by: Max Gekk <[email protected]>
---
 .../spark/sql/catalyst/expressions/hash.scala      | 28 +++++++-
 .../expressions/HashExpressionsSuite.scala         | 83 +++++++++++++++++++++-
 2 files changed, 109 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 0f27dee9dbc8..3c1d666c89b3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.hash.Murmur3_x86_32
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, 
UTF8String}
 import org.apache.spark.util.ArrayImplicits._
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -430,6 +430,11 @@ abstract class HashExpression[E] extends Expression {
     s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
   }
 
+  protected def genHashTimestampNanos(input: String, result: String): String = 
{
+    val epochMicrosHash = s"$hasherClassName.hashLong($input.epochMicros, 
$result)"
+    s"$result = $hasherClassName.hashInt($input.nanosWithinMicro, 
$epochMicrosHash);"
+  }
+
   protected def genHashString(
       ctx: CodegenContext, stringType: StringType, input: String, result: 
String): String = {
     if (stringType.supportsBinaryEquality) {
@@ -549,6 +554,8 @@ abstract class HashExpression[E] extends Expression {
     case ByteType | ShortType | IntegerType | DateType => genHashInt(input, 
result)
     case LongType | _: TimeType => genHashLong(input, result)
     case TimestampType | TimestampNTZType => genHashTimestamp(input, result)
+    case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
+      genHashTimestampNanos(input, result)
     case FloatType => genHashFloat(input, result)
     case DoubleType => genHashDouble(input, result)
     case d: DecimalType => genHashDecimal(ctx, d, input, result)
@@ -636,6 +643,7 @@ abstract class InterpretedHashFunction {
           hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 
seed)
         }
       case c: CalendarInterval => hashInt(c.months, hashInt(c.days, 
hashLong(c.microseconds, seed)))
+      case t: TimestampNanosVal => hashInt(t.nanosWithinMicro, 
hashLong(t.epochMicros, seed))
       case a: Array[Byte] =>
         hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
       case s: UTF8String =>
@@ -977,6 +985,12 @@ case class HiveHash(children: Seq[Expression]) extends 
HashExpression[Int] {
       $result = (int) 
${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestamp($input);
      """
 
+  override protected def genHashTimestampNanos(input: String, result: String): 
String =
+    s"""
+      $result = (int)
+        
${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestampNanos($input);
+     """
+
   override protected def genHashString(
       ctx: CodegenContext, stringType: StringType, input: String, result: 
String): String = {
     if (stringType.supportsBinaryEquality || !isCollationAware) {
@@ -1144,6 +1158,17 @@ object HiveHashFunction extends InterpretedHashFunction {
     ((result >>> 32) ^ result).toInt
   }
 
+  /**
+   * Extends [[hashTimestamp]] with the sub-microsecond nanoseconds carried by 
a
+   * [[TimestampNanosVal]], folding the extra field in with the same `* 37 + 
field` idiom used by
+   * [[hashCalendarInterval]]. Hive has no nanosecond-precision timestamp 
type, so this is a
+   * Spark-defined, self-consistent hash (equal values hash equally) rather 
than a Hive-compatible
+   * one.
+   */
+  def hashTimestampNanos(t: TimestampNanosVal): Long = {
+    (hashTimestamp(t.epochMicros) * 37) + t.nanosWithinMicro
+  }
+
   /**
    * Hive allows input intervals to be defined using units below but the 
intervals
    * have to be from the same category:
@@ -1242,6 +1267,7 @@ object HiveHashFunction extends InterpretedHashFunction {
 
       case d: Decimal => normalizeDecimal(d.toJavaBigDecimal).hashCode()
       case timestamp: Long if dataType.isInstanceOf[TimestampType] => 
hashTimestamp(timestamp)
+      case timestampNanos: TimestampNanosVal => 
hashTimestampNanos(timestampNanos)
       case calendarInterval: CalendarInterval => 
hashCalendarInterval(calendarInterval)
       case _ => super.hash(value, dataType, 0, isCollationAware, 
legacyCollationAwareHashing)
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
index 7bb62e8a836c..b26c48d65796 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala
@@ -33,7 +33,8 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjecti
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CollationFactory, DateTimeUtils, GenericArrayData, IntervalUtils}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, StructType, _}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.hash.Murmur3_x86_32
+import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String}
 import org.apache.spark.util.ArrayImplicits._
 
 class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -885,6 +886,86 @@ class HashExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     checkEvaluation(HiveHash(Seq(time)), -1567775210)
   }
 
+  test("HashExpression supports nanosecond timestamp types") {
+    // (epochMicros, nanosWithinMicro) pairs covering zero/mid/max nanos, 
negative micros, and
+    // the Long epoch-micro boundaries.
+    val values = Seq(
+      TimestampNanosVal.fromParts(0L, 0.toShort),
+      TimestampNanosVal.fromParts(1L, 1.toShort),
+      TimestampNanosVal.fromParts(1234567890L, 999.toShort),
+      TimestampNanosVal.fromParts(-1L, 500.toShort),
+      TimestampNanosVal.fromParts(Long.MinValue, 0.toShort),
+      TimestampNanosVal.fromParts(Long.MaxValue, 999.toShort))
+
+    Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9),
+        TimestampNTZNanosType(7), TimestampLTZNanosType(7)).foreach { dt =>
+      (values :+ null).foreach { v =>
+        // 1) Literal child: the value is embedded as a constant, so this 
asserts that the
+        // interpreted and codegen paths agree. (The unsafe projection here 
only round-trips the
+        // scalar hash result, not the nanos input -- that path is covered 
below.)
+        val lit = Literal.create(v, dt)
+        checkEvaluation(Murmur3Hash(Seq(lit), 42), Murmur3Hash(Seq(lit), 
42).eval())
+        checkEvaluation(XxHash64(Seq(lit), 42L), XxHash64(Seq(lit), 
42L).eval())
+        checkEvaluation(HiveHash(Seq(lit)), HiveHash(Seq(lit)).eval())
+
+        // 2) BoundReference over a row: drives the ordinal row-read 
(getTimestampNTZNanos /
+        // getTimestampLTZNanos) and the UnsafeRow round-trip of the nanos 
value itself -- the
+        // real GROUP BY / shuffle / join input path that the literal case 
above skips.
+        val row = InternalRow(v)
+        val ref = BoundReference(0, dt, nullable = true)
+        checkEvaluation(Murmur3Hash(Seq(ref), 42), Murmur3Hash(Seq(ref), 
42).eval(row), row)
+        checkEvaluation(XxHash64(Seq(ref), 42L), XxHash64(Seq(ref), 
42L).eval(row), row)
+        checkEvaluation(HiveHash(Seq(ref)), HiveHash(Seq(ref)).eval(row), row)
+      }
+    }
+  }
+
+  test("nanosecond timestamp hash is consistent with equality") {
+    val dt = TimestampNTZNanosType(9)
+    def lit(micros: Long, nanos: Short): Literal =
+      Literal.create(TimestampNanosVal.fromParts(micros, nanos), dt)
+
+    val a = lit(1234567890L, 123)
+    val aCopy = lit(1234567890L, 123)
+    val diffNanos = lit(1234567890L, 124) // same micros, different sub-micro 
nanos
+    val diffMicros = lit(1234567891L, 123) // different micros, same nanos
+
+    Seq[Expression => Any](
+      e => Murmur3Hash(Seq(e), 42).eval(),
+      e => XxHash64(Seq(e), 42L).eval(),
+      e => HiveHash(Seq(e)).eval()).foreach { hash =>
+      // Equal values hash equally.
+      assert(hash(a) === hash(aCopy))
+      // Both fields contribute to the hash (guards against a dropped 
epochMicros/nanos field).
+      assert(hash(a) !== hash(diffNanos))
+      assert(hash(a) !== hash(diffMicros))
+    }
+  }
+
+  test("nanosecond timestamp hash matches expected golden values") {
+    // The expected values are composed independently of the expression under 
test -- directly
+    // from the primitive hashers (and the separate hashTimestamp for Hive) 
with an explicit
+    // epochMicros-then-nanosWithinMicro folding order. So a wrong 
seed/constant or a swapped
+    // field order in the dispatch is caught, rather than masked by comparing 
the expression
+    // against itself.
+    val micros = 1234567890L
+    val nanos: Short = 789
+    val v = TimestampNanosVal.fromParts(micros, nanos)
+    val seed = 42
+    Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9)).foreach { dt =>
+      val lit = Literal.create(v, dt)
+      checkEvaluation(
+        Murmur3Hash(Seq(lit), seed),
+        Murmur3_x86_32.hashInt(nanos, Murmur3_x86_32.hashLong(micros, seed)))
+      checkEvaluation(
+        XxHash64(Seq(lit), seed.toLong),
+        XXH64.hashInt(nanos, XXH64.hashLong(micros, seed.toLong)))
+      checkEvaluation(
+        HiveHash(Seq(lit)),
+        ((HiveHashFunction.hashTimestamp(micros) * 37) + nanos).toInt)
+    }
+  }
+
   private def testHash(inputSchema: StructType): Unit = {
     val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = 
false).get
     val toRow = ExpressionEncoder(inputSchema).createSerializer()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to