This is an automated email from the ASF dual-hosted git repository.
MaxGekk pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 8ef2ecf4afa9 [SPARK-51069][SQL] Add big-endian support to
UnsafeRowUtils.validateStructuralIntegrityWithReasonImpl
8ef2ecf4afa9 is described below
commit 8ef2ecf4afa9ec8284eea125b3c6fc786f0d42d6
Author: Jonathan Albrecht <[email protected]>
AuthorDate: Fri May 29 22:56:49 2026 +0200
[SPARK-51069][SQL] Add big-endian support to
UnsafeRowUtils.validateStructuralIntegrityWithReasonImpl
### What changes were proposed in this pull request?
In UnsafeRowUtils.validateStructuralIntegrityWithReasonImpl, when checking
if the unused bits in the field are all zeros for fixed length fields, add
functions that work on big endian platforms.
### Why are the changes needed?
The current checks only work on little endian platforms. This change
detects the platform endianness once when the getPadding* functions are
initialized and sets it to the proper implementation for the platform so there
should be no significant change in performance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This was tested on amd64 (little-endian) and s390x (big-endian) with
existing unit tests. On s390x, many existing tests fail in the SQL projects.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49773 from jonathan-albrecht-ibm/master-endian-unsafeRowUtils.
Authored-by: Jonathan Albrecht <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit 06ffa273d0213eb14a96c923eba7901f05b23e08)
Signed-off-by: Max Gekk <[email protected]>
---
.../spark/sql/catalyst/util/UnsafeRowUtils.scala | 37 ++++++-
.../sql/catalyst/util/UnsafeRowUtilsSuite.scala | 116 +++++++++++++++++++++
2 files changed, 148 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 4da9c88dd9a9..470b0e1f6a6b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -17,11 +17,38 @@
package org.apache.spark.sql.catalyst.util
+import java.nio.ByteOrder.{nativeOrder, BIG_ENDIAN}
+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types._
object UnsafeRowUtils {
+ private[util] sealed trait PaddingProvider {
+ def getPadding(row: UnsafeRow, index: Int, bitShift: Int): Long
+ def getPaddingBoolean(row: UnsafeRow, index: Int): Long
+ }
+
+ private[util] object PaddingProviderLE extends PaddingProvider {
+ override def getPadding(
+ row: UnsafeRow, index: Int, bitShift: Int): Long = row.getLong(index) >>
bitShift
+ override def getPaddingBoolean(
+ row: UnsafeRow, index: Int): Long = row.getLong(index) >> 1
+ }
+
+ private[util] object PaddingProviderBE extends PaddingProvider {
+ override def getPadding(
+ row: UnsafeRow, index: Int, bitShift: Int): Long = row.getLong(index) <<
bitShift
+ override def getPaddingBoolean(
+ row: UnsafeRow, index: Int): Long = row.getLong(index) &
0xFEFFFFFFFFFFFFFFL
+ }
+
+ private val padder: PaddingProvider = if (nativeOrder() == BIG_ENDIAN) {
+ PaddingProviderBE
+ } else {
+ PaddingProviderLE
+ }
+
/**
* Use the following rules to check the integrity of the UnsafeRow:
* - schema.fields.length == row.numFields should always be true
@@ -74,23 +101,23 @@ object UnsafeRowUtils {
case (field, index) if UnsafeRow.isFixedLength(field.dataType) &&
!row.isNullAt(index) =>
field.dataType match {
case BooleanType =>
- if ((row.getLong(index) >> 1) != 0L) {
+ if (padder.getPaddingBoolean(row, index) != 0L) {
return Some(s"Fixed-length field validation error: field:
$field, index: $index")
}
case ByteType =>
- if ((row.getLong(index) >> 8) != 0L) {
+ if (padder.getPadding(row, index, 8) != 0L) {
return Some(s"Fixed-length field validation error: field:
$field, index: $index")
}
case ShortType =>
- if ((row.getLong(index) >> 16) != 0L) {
+ if (padder.getPadding(row, index, 16) != 0L) {
return Some(s"Fixed-length field validation error: field:
$field, index: $index")
}
case IntegerType =>
- if ((row.getLong(index) >> 32) != 0L) {
+ if (padder.getPadding(row, index, 32) != 0L) {
return Some(s"Fixed-length field validation error: field:
$field, index: $index")
}
case FloatType =>
- if ((row.getLong(index) >> 32) != 0L) {
+ if (padder.getPadding(row, index, 32) != 0L) {
return Some(s"Fixed-length field validation error: field:
$field, index: $index")
}
case _ =>
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
index 0b3f1f1bdb79..9f405818598b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.util
import java.math.{BigDecimal => JavaBigDecimal}
+import java.nio.ByteOrder.{nativeOrder, BIG_ENDIAN}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeProjection, UnsafeRow}
@@ -157,4 +158,119 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
StructType(StructField("field",
StructType(StructField("sub", nonBinaryStringType) :: Nil)) :: Nil)))
}
+
+ test("PaddingProvider handles endianness") {
+ // The following arrays contain the same 8 byte field values as represented
+ // in memory on little endian and big endian platforms. They are
+ // what would be seen in memory if the following values were set:
+ //
+ // row.setBoolean(0, true)
+ // row.setByte(1, 0xFF.toByte)
+ // row.setShort(2, 0xFFFE.toShort)
+ // row.setInt(3, 0xFFFEFDFC.toInt)
+ // row.setFloat(4, java.lang.Float.intBitsToFloat(0xFF7FFDFC.toInt))
+ //
+ // Note that in either platform endianness, the values are placed in the
first N bytes
+ // of the 8 byte field. The difference is in the order of the bytes in the
field.
+ val boolLE = Array[Byte](1, 0, 0, 0, 0, 0, 0, 0)
+ val byteLE = Array[Byte](0xFF.toByte, 0, 0, 0, 0, 0, 0, 0)
+ val shortLE = Array[Byte](0xFE.toByte, 0xFF.toByte, 0, 0, 0, 0, 0, 0)
+ val intLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0xFE.toByte,
0xFF.toByte, 0, 0, 0, 0)
+ val floatLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0x7F.toByte,
0xFF.toByte, 0, 0, 0, 0)
+
+ val boolBE = Array[Byte](1, 0, 0, 0, 0, 0, 0, 0)
+ val byteBE = Array[Byte](0xFF.toByte, 0, 0, 0, 0, 0, 0, 0)
+ val shortBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 0, 0, 0, 0, 0, 0)
+ val intBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 0xFD.toByte,
0xFC.toByte, 0, 0, 0, 0)
+ val floatBE = Array[Byte](0xFF.toByte, 0x7F.toByte, 0xFD.toByte,
0xFC.toByte, 0, 0, 0, 0)
+
+ // Corrupt field values
+ val boolCorruptLE = Array[Byte](2, 0, 0, 0, 0, 0, 0, 0)
+ val byteCorruptLE = Array[Byte](0xFF.toByte, 1, 0, 0, 0, 0, 0, 0)
+ val shortCorruptLE = Array[Byte](0xFE.toByte, 0xFF.toByte, 1, 0, 0, 0, 0,
0)
+ val intCorruptLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0xFE.toByte,
0xFF.toByte, 1, 0, 0, 0)
+ val floatCorruptLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0x7F.toByte,
0xFF.toByte, 1, 0, 0, 0)
+
+ val boolCorruptBE = Array[Byte](2, 0, 0, 0, 0, 0, 0, 0)
+ val byteCorruptBE = Array[Byte](0xFF.toByte, 1, 0, 0, 0, 0, 0, 0)
+ val shortCorruptBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 1, 0, 0, 0, 0,
0)
+ val intCorruptBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 0xFD.toByte,
0xFC.toByte, 1, 0, 0, 0)
+ val floatCorruptBE = Array[Byte](0xFF.toByte, 0x7F.toByte, 0xFD.toByte,
0xFC.toByte, 1, 0, 0, 0)
+
+ val numFields = 5
+ val fieldsStartIdx = UnsafeRow.calculateBitSetWidthInBytes(numFields)
+ val sizeInBytes = fieldsStartIdx + (numFields * 8)
+ val data = new Array[Byte](sizeInBytes)
+ val row = new UnsafeRow(numFields)
+ row.pointTo(data, sizeInBytes)
+
+ // Set all bits of all fields to 1 to ensure we don't miss overwiting any
fields later
+ row.setLong(0, 0xFFFFFFFFFFFFFFFFL)
+ row.setLong(1, 0xFFFFFFFFFFFFFFFFL)
+ row.setLong(2, 0xFFFFFFFFFFFFFFFFL)
+ row.setLong(3, 0xFFFFFFFFFFFFFFFFL)
+ row.setLong(4, 0xFFFFFFFFFFFFFFFFL)
+
+ // The PaddingProvider implementations get the full 8 bytes of the field
using
+ // UnsafeRow.getLong(n) which is platform endianness dependent.
+ // When testing PaddingProviderBE on little endian platforms, the big
endian byte arrays
+ // must be reversed so that UnsafeRow.getLong(n) returns the long value
that would be seen
+ // on a big endian platform.
+ // The opposite is true when testing PaddingProviderLE on big endian
platforms.
+ def overwrite(src: Array[Byte], fieldIdx: Int, reverse: Boolean): Unit = {
+ Array.copy(if (reverse) src.reverse else src, 0, data, fieldsStartIdx +
(fieldIdx * 8), 8)
+ }
+
+ // Overwrite the row's data with raw little endian values
+ // reversing the byte order if testing on big endian platforms.
+ overwrite(boolLE, 0, nativeOrder() == BIG_ENDIAN)
+ overwrite(byteLE, 1, nativeOrder() == BIG_ENDIAN)
+ overwrite(shortLE, 2, nativeOrder() == BIG_ENDIAN)
+ overwrite(intLE, 3, nativeOrder() == BIG_ENDIAN)
+ overwrite(floatLE, 4, nativeOrder() == BIG_ENDIAN)
+
+ assert(UnsafeRowUtils.PaddingProviderLE.getPaddingBoolean(row, 0) == 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 1, 8) == 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 2, 16) == 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 3, 32) == 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 4, 32) == 0)
+
+ overwrite(boolCorruptLE, 0, nativeOrder() == BIG_ENDIAN)
+ overwrite(byteCorruptLE, 1, nativeOrder() == BIG_ENDIAN)
+ overwrite(shortCorruptLE, 2, nativeOrder() == BIG_ENDIAN)
+ overwrite(intCorruptLE, 3, nativeOrder() == BIG_ENDIAN)
+ overwrite(floatCorruptLE, 4, nativeOrder() == BIG_ENDIAN)
+
+ assert(UnsafeRowUtils.PaddingProviderLE.getPaddingBoolean(row, 0) != 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 1, 8) != 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 2, 16) != 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 3, 32) != 0)
+ assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 4, 32) != 0)
+
+ // Overwrite the row's data with raw big endian values
+ // reversing the byte order if testing on little endian platforms.
+ overwrite(boolBE, 0, nativeOrder() != BIG_ENDIAN)
+ overwrite(byteBE, 1, nativeOrder() != BIG_ENDIAN)
+ overwrite(shortBE, 2, nativeOrder() != BIG_ENDIAN)
+ overwrite(intBE, 3, nativeOrder() != BIG_ENDIAN)
+ overwrite(floatBE, 4, nativeOrder() != BIG_ENDIAN)
+
+ assert(UnsafeRowUtils.PaddingProviderBE.getPaddingBoolean(row, 0) == 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 1, 8) == 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 2, 16) == 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 3, 32) == 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 4, 32) == 0)
+
+ overwrite(boolCorruptBE, 0, nativeOrder() != BIG_ENDIAN)
+ overwrite(byteCorruptBE, 1, nativeOrder() != BIG_ENDIAN)
+ overwrite(shortCorruptBE, 2, nativeOrder() != BIG_ENDIAN)
+ overwrite(intCorruptBE, 3, nativeOrder() != BIG_ENDIAN)
+ overwrite(floatCorruptBE, 4, nativeOrder() != BIG_ENDIAN)
+
+ assert(UnsafeRowUtils.PaddingProviderBE.getPaddingBoolean(row, 0) != 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 1, 8) != 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 2, 16) != 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 3, 32) != 0)
+ assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 4, 32) != 0)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]