This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d7545d0fc096 [SPARK-50624][SQL] Add TimestampNTZType to 
ColumnarRow/MutableColumnarRow
d7545d0fc096 is described below

commit d7545d0fc0962849f979a29345ac14169688edf0
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Jan 13 16:10:02 2025 +0800

    [SPARK-50624][SQL] Add TimestampNTZType to ColumnarRow/MutableColumnarRow
    
    ### What changes were proposed in this pull request?
    
    Noticed that this was missing when using this in Iceberg. See additional 
details in https://github.com/apache/iceberg/pull/11815#discussion_r1892693224
    
    ### Why are the changes needed?
    
    To be able to read `TimestampNTZType` when using `ColumnarRow`
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Added some unit tests that failed without the fix
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49437 from nastra/SPARK-50624.
    
    Authored-by: Eduard Tudenhoefner <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/vectorized/ColumnarRow.java   |  2 ++
 .../execution/vectorized/MutableColumnarRow.java   |  4 ++++
 .../execution/vectorized/ColumnVectorSuite.scala   | 19 +++++++++++++++++
 .../sql/vectorized/ArrowColumnVectorSuite.scala    | 24 ++++++++++++++++++++++
 4 files changed, 49 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index aaac980bb332..ac05981da5a2 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -188,6 +188,8 @@ public final class ColumnarRow extends InternalRow {
       return getInt(ordinal);
     } else if (dataType instanceof TimestampType) {
       return getLong(ordinal);
+    } else if (dataType instanceof TimestampNTZType) {
+      return getLong(ordinal);
     } else if (dataType instanceof ArrayType) {
       return getArray(ordinal);
     } else if (dataType instanceof StructType) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 0464fe815989..42d39457330c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -82,6 +82,8 @@ public final class MutableColumnarRow extends InternalRow {
           row.setInt(i, getInt(i));
         } else if (dt instanceof TimestampType) {
           row.setLong(i, getLong(i));
+        } else if (dt instanceof TimestampNTZType) {
+          row.setLong(i, getLong(i));
         } else if (dt instanceof StructType) {
           row.update(i, getStruct(i, ((StructType) 
dt).fields().length).copy());
         } else if (dt instanceof ArrayType) {
@@ -191,6 +193,8 @@ public final class MutableColumnarRow extends InternalRow {
       return getInt(ordinal);
     } else if (dataType instanceof TimestampType) {
       return getLong(ordinal);
+    } else if (dataType instanceof TimestampNTZType) {
+      return getLong(ordinal);
     } else if (dataType instanceof ArrayType) {
       return getArray(ordinal);
     } else if (dataType instanceof StructType structType) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 0cc4f7bf2548..0edbfd10d8cd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -274,6 +274,19 @@ class ColumnVectorSuite extends SparkFunSuite with 
SQLHelper {
     }
   }
 
+  testVectors("mutable ColumnarRow with TimestampNTZType", 10, 
TimestampNTZType) { testVector =>
+    val mutableRow = new MutableColumnarRow(Array(testVector))
+    (0 until 10).foreach { i =>
+      mutableRow.rowId = i
+      mutableRow.setLong(0, 10 - i)
+    }
+    (0 until 10).foreach { i =>
+      mutableRow.rowId = i
+      assert(mutableRow.get(0, TimestampNTZType) === (10 - i))
+      assert(mutableRow.copy().get(0, TimestampNTZType) === (10 - i))
+    }
+  }
+
   val arrayType: ArrayType = ArrayType(IntegerType, containsNull = true)
   testVectors("array", 10, arrayType) { testVector =>
 
@@ -384,18 +397,24 @@ class ColumnVectorSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   val structType: StructType = new StructType().add("int", 
IntegerType).add("double", DoubleType)
+    .add("ts", TimestampNTZType)
   testVectors("struct", 10, structType) { testVector =>
     val c1 = testVector.getChild(0)
     val c2 = testVector.getChild(1)
+    val c3 = testVector.getChild(2)
     c1.putInt(0, 123)
     c2.putDouble(0, 3.45)
+    c3.putLong(0, 1000L)
     c1.putInt(1, 456)
     c2.putDouble(1, 5.67)
+    c3.putLong(1, 2000L)
 
     assert(testVector.getStruct(0).get(0, IntegerType) === 123)
     assert(testVector.getStruct(0).get(1, DoubleType) === 3.45)
+    assert(testVector.getStruct(0).get(2, TimestampNTZType) === 1000L)
     assert(testVector.getStruct(1).get(0, IntegerType) === 456)
     assert(testVector.getStruct(1).get(1, DoubleType) === 5.67)
+    assert(testVector.getStruct(1).get(2, TimestampNTZType) === 2000L)
   }
 
   testVectors("SPARK-44805: getInts with dictionary", 3, IntegerType) { 
testVector =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
index 436cea50ad97..9180ce1aee19 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
@@ -515,4 +515,28 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
     columnVector.close()
     allocator.close()
   }
+
+  test("struct with TimestampNTZType") {
+    val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, 
Long.MaxValue)
+    val schema = new StructType().add("ts", TimestampNTZType)
+    val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, 
null)
+      .createVector(allocator).asInstanceOf[StructVector]
+    vector.allocateNew()
+    val timestampVector = 
vector.getChildByOrdinal(0).asInstanceOf[TimeStampMicroVector]
+
+    vector.setIndexDefined(0)
+    timestampVector.setSafe(0, 1000L)
+
+    timestampVector.setValueCount(1)
+    vector.setValueCount(1)
+
+    val columnVector = new ArrowColumnVector(vector)
+    assert(columnVector.dataType === schema)
+
+    val row0 = columnVector.getStruct(0)
+    assert(row0.get(0, TimestampNTZType) === 1000L)
+
+    columnVector.close()
+    allocator.close()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to