This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d7545d0fc096 [SPARK-50624][SQL] Add TimestampNTZType to
ColumnarRow/MutableColumnarRow
d7545d0fc096 is described below
commit d7545d0fc0962849f979a29345ac14169688edf0
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Jan 13 16:10:02 2025 +0800
[SPARK-50624][SQL] Add TimestampNTZType to ColumnarRow/MutableColumnarRow
### What changes were proposed in this pull request?
Noticed that this was missing when using this in Iceberg. See additional
details in https://github.com/apache/iceberg/pull/11815#discussion_r1892693224
### Why are the changes needed?
To be able to read `TimestampNTZType` when using `ColumnarRow`
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Added some unit tests that failed without the fix
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49437 from nastra/SPARK-50624.
Authored-by: Eduard Tudenhoefner <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/vectorized/ColumnarRow.java | 2 ++
.../execution/vectorized/MutableColumnarRow.java | 4 ++++
.../execution/vectorized/ColumnVectorSuite.scala | 19 +++++++++++++++++
.../sql/vectorized/ArrowColumnVectorSuite.scala | 24 ++++++++++++++++++++++
4 files changed, 49 insertions(+)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index aaac980bb332..ac05981da5a2 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -188,6 +188,8 @@ public final class ColumnarRow extends InternalRow {
return getInt(ordinal);
} else if (dataType instanceof TimestampType) {
return getLong(ordinal);
+ } else if (dataType instanceof TimestampNTZType) {
+ return getLong(ordinal);
} else if (dataType instanceof ArrayType) {
return getArray(ordinal);
} else if (dataType instanceof StructType) {
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 0464fe815989..42d39457330c 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -82,6 +82,8 @@ public final class MutableColumnarRow extends InternalRow {
row.setInt(i, getInt(i));
} else if (dt instanceof TimestampType) {
row.setLong(i, getLong(i));
+ } else if (dt instanceof TimestampNTZType) {
+ row.setLong(i, getLong(i));
} else if (dt instanceof StructType) {
row.update(i, getStruct(i, ((StructType)
dt).fields().length).copy());
} else if (dt instanceof ArrayType) {
@@ -191,6 +193,8 @@ public final class MutableColumnarRow extends InternalRow {
return getInt(ordinal);
} else if (dataType instanceof TimestampType) {
return getLong(ordinal);
+ } else if (dataType instanceof TimestampNTZType) {
+ return getLong(ordinal);
} else if (dataType instanceof ArrayType) {
return getArray(ordinal);
} else if (dataType instanceof StructType structType) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 0cc4f7bf2548..0edbfd10d8cd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -274,6 +274,19 @@ class ColumnVectorSuite extends SparkFunSuite with
SQLHelper {
}
}
+ testVectors("mutable ColumnarRow with TimestampNTZType", 10,
TimestampNTZType) { testVector =>
+ val mutableRow = new MutableColumnarRow(Array(testVector))
+ (0 until 10).foreach { i =>
+ mutableRow.rowId = i
+ mutableRow.setLong(0, 10 - i)
+ }
+ (0 until 10).foreach { i =>
+ mutableRow.rowId = i
+ assert(mutableRow.get(0, TimestampNTZType) === (10 - i))
+ assert(mutableRow.copy().get(0, TimestampNTZType) === (10 - i))
+ }
+ }
+
val arrayType: ArrayType = ArrayType(IntegerType, containsNull = true)
testVectors("array", 10, arrayType) { testVector =>
@@ -384,18 +397,24 @@ class ColumnVectorSuite extends SparkFunSuite with
SQLHelper {
}
val structType: StructType = new StructType().add("int",
IntegerType).add("double", DoubleType)
+ .add("ts", TimestampNTZType)
testVectors("struct", 10, structType) { testVector =>
val c1 = testVector.getChild(0)
val c2 = testVector.getChild(1)
+ val c3 = testVector.getChild(2)
c1.putInt(0, 123)
c2.putDouble(0, 3.45)
+ c3.putLong(0, 1000L)
c1.putInt(1, 456)
c2.putDouble(1, 5.67)
+ c3.putLong(1, 2000L)
assert(testVector.getStruct(0).get(0, IntegerType) === 123)
assert(testVector.getStruct(0).get(1, DoubleType) === 3.45)
+ assert(testVector.getStruct(0).get(2, TimestampNTZType) === 1000L)
assert(testVector.getStruct(1).get(0, IntegerType) === 456)
assert(testVector.getStruct(1).get(1, DoubleType) === 5.67)
+ assert(testVector.getStruct(1).get(2, TimestampNTZType) === 2000L)
}
testVectors("SPARK-44805: getInts with dictionary", 3, IntegerType) {
testVector =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
index 436cea50ad97..9180ce1aee19 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/vectorized/ArrowColumnVectorSuite.scala
@@ -515,4 +515,28 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
columnVector.close()
allocator.close()
}
+
+ test("struct with TimestampNTZType") {
+ val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0,
Long.MaxValue)
+ val schema = new StructType().add("ts", TimestampNTZType)
+ val vector = ArrowUtils.toArrowField("struct", schema, nullable = true,
null)
+ .createVector(allocator).asInstanceOf[StructVector]
+ vector.allocateNew()
+ val timestampVector =
vector.getChildByOrdinal(0).asInstanceOf[TimeStampMicroVector]
+
+ vector.setIndexDefined(0)
+ timestampVector.setSafe(0, 1000L)
+
+ timestampVector.setValueCount(1)
+ vector.setValueCount(1)
+
+ val columnVector = new ArrowColumnVector(vector)
+ assert(columnVector.dataType === schema)
+
+ val row0 = columnVector.getStruct(0)
+ assert(row0.get(0, TimestampNTZType) === 1000L)
+
+ columnVector.close()
+ allocator.close()
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]