This is an automated email from the ASF dual-hosted git repository.

MaxGekk pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 8ef2ecf4afa9 [SPARK-51069][SQL] Add big-endian support to 
UnsafeRowUtils.validateStructuralIntegrityWithReasonImpl
8ef2ecf4afa9 is described below

commit 8ef2ecf4afa9ec8284eea125b3c6fc786f0d42d6
Author: Jonathan Albrecht <[email protected]>
AuthorDate: Fri May 29 22:56:49 2026 +0200

    [SPARK-51069][SQL] Add big-endian support to 
UnsafeRowUtils.validateStructuralIntegrityWithReasonImpl
    
    ### What changes were proposed in this pull request?
    In UnsafeRowUtils.validateStructuralIntegrityWithReasonImpl, when checking 
if the unused bits in the field are all zeros for fixed length fields, add 
functions that work on big endian platforms.
    
    ### Why are the changes needed?
    The current checks only work on little endian platforms. This change 
detects the platform endianness once when the getPadding* functions are 
initialized and sets it to the proper implementation for the platform so there 
should be no significant change in performance.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    This was tested on amd64 (little-endian) and s390x (big-endian) with 
existing unit tests. On s390x, many existing tests fail in the SQL projects.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49773 from jonathan-albrecht-ibm/master-endian-unsafeRowUtils.
    
    Authored-by: Jonathan Albrecht <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
    (cherry picked from commit 06ffa273d0213eb14a96c923eba7901f05b23e08)
    Signed-off-by: Max Gekk <[email protected]>
---
 .../spark/sql/catalyst/util/UnsafeRowUtils.scala   |  37 ++++++-
 .../sql/catalyst/util/UnsafeRowUtilsSuite.scala    | 116 +++++++++++++++++++++
 2 files changed, 148 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 4da9c88dd9a9..470b0e1f6a6b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -17,11 +17,38 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import java.nio.ByteOrder.{nativeOrder, BIG_ENDIAN}
+
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.types._
 
 object UnsafeRowUtils {
 
+  private[util] sealed trait PaddingProvider {
+    def getPadding(row: UnsafeRow, index: Int, bitShift: Int): Long
+    def getPaddingBoolean(row: UnsafeRow, index: Int): Long
+  }
+
+  private[util] object PaddingProviderLE extends PaddingProvider {
+    override def getPadding(
+      row: UnsafeRow, index: Int, bitShift: Int): Long = row.getLong(index) >> 
bitShift
+    override def getPaddingBoolean(
+      row: UnsafeRow, index: Int): Long = row.getLong(index) >> 1
+  }
+
+  private[util] object PaddingProviderBE extends PaddingProvider {
+    override def getPadding(
+      row: UnsafeRow, index: Int, bitShift: Int): Long = row.getLong(index) << 
bitShift
+    override def getPaddingBoolean(
+      row: UnsafeRow, index: Int): Long = row.getLong(index) & 
0xFEFFFFFFFFFFFFFFL
+  }
+
+  private val padder: PaddingProvider = if (nativeOrder() == BIG_ENDIAN) {
+    PaddingProviderBE
+  } else {
+    PaddingProviderLE
+  }
+
   /**
    * Use the following rules to check the integrity of the UnsafeRow:
    * - schema.fields.length == row.numFields should always be true
@@ -74,23 +101,23 @@ object UnsafeRowUtils {
       case (field, index) if UnsafeRow.isFixedLength(field.dataType) && 
!row.isNullAt(index) =>
         field.dataType match {
           case BooleanType =>
-            if ((row.getLong(index) >> 1) != 0L) {
+            if (padder.getPaddingBoolean(row, index) != 0L) {
               return Some(s"Fixed-length field validation error: field: 
$field, index: $index")
             }
           case ByteType =>
-            if ((row.getLong(index) >> 8) != 0L) {
+            if (padder.getPadding(row, index, 8) != 0L) {
               return Some(s"Fixed-length field validation error: field: 
$field, index: $index")
             }
           case ShortType =>
-            if ((row.getLong(index) >> 16) != 0L) {
+            if (padder.getPadding(row, index, 16) != 0L) {
               return Some(s"Fixed-length field validation error: field: 
$field, index: $index")
             }
           case IntegerType =>
-            if ((row.getLong(index) >> 32) != 0L) {
+            if (padder.getPadding(row, index, 32) != 0L) {
               return Some(s"Fixed-length field validation error: field: 
$field, index: $index")
             }
           case FloatType =>
-            if ((row.getLong(index) >> 32) != 0L) {
+            if (padder.getPadding(row, index, 32) != 0L) {
               return Some(s"Fixed-length field validation error: field: 
$field, index: $index")
             }
           case _ =>
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
index 0b3f1f1bdb79..9f405818598b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtilsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.util
 
 import java.math.{BigDecimal => JavaBigDecimal}
+import java.nio.ByteOrder.{nativeOrder, BIG_ENDIAN}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeProjection, UnsafeRow}
@@ -157,4 +158,119 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
       StructType(StructField("field",
         StructType(StructField("sub", nonBinaryStringType) :: Nil)) :: Nil)))
   }
+
+  test("PaddingProvider handles endianness") {
+    // The following arrays contain the same 8 byte field values as represented
+    // in memory on little endian and big endian platforms. They are
+    // what would be seen in memory if the following values were set:
+    //
+    //   row.setBoolean(0, true)
+    //   row.setByte(1, 0xFF.toByte)
+    //   row.setShort(2, 0xFFFE.toShort)
+    //   row.setInt(3, 0xFFFEFDFC.toInt)
+    //   row.setFloat(4, java.lang.Float.intBitsToFloat(0xFF7FFDFC.toInt))
+    //
+    // Note that in either platform endianness, the values are placed in the 
first N bytes
+    // of the 8 byte field. The difference is in the order of the bytes in the 
field.
+    val boolLE = Array[Byte](1, 0, 0, 0, 0, 0, 0, 0)
+    val byteLE = Array[Byte](0xFF.toByte, 0, 0, 0, 0, 0, 0, 0)
+    val shortLE = Array[Byte](0xFE.toByte, 0xFF.toByte, 0, 0, 0, 0, 0, 0)
+    val intLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0xFE.toByte, 
0xFF.toByte, 0, 0, 0, 0)
+    val floatLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0x7F.toByte, 
0xFF.toByte, 0, 0, 0, 0)
+
+    val boolBE = Array[Byte](1, 0, 0, 0, 0, 0, 0, 0)
+    val byteBE = Array[Byte](0xFF.toByte, 0, 0, 0, 0, 0, 0, 0)
+    val shortBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 0, 0, 0, 0, 0, 0)
+    val intBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 0xFD.toByte, 
0xFC.toByte, 0, 0, 0, 0)
+    val floatBE = Array[Byte](0xFF.toByte, 0x7F.toByte, 0xFD.toByte, 
0xFC.toByte, 0, 0, 0, 0)
+
+    // Corrupt field values
+    val boolCorruptLE = Array[Byte](2, 0, 0, 0, 0, 0, 0, 0)
+    val byteCorruptLE = Array[Byte](0xFF.toByte, 1, 0, 0, 0, 0, 0, 0)
+    val shortCorruptLE = Array[Byte](0xFE.toByte, 0xFF.toByte, 1, 0, 0, 0, 0, 
0)
+    val intCorruptLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0xFE.toByte, 
0xFF.toByte, 1, 0, 0, 0)
+    val floatCorruptLE = Array[Byte](0xFC.toByte, 0xFD.toByte, 0x7F.toByte, 
0xFF.toByte, 1, 0, 0, 0)
+
+    val boolCorruptBE = Array[Byte](2, 0, 0, 0, 0, 0, 0, 0)
+    val byteCorruptBE = Array[Byte](0xFF.toByte, 1, 0, 0, 0, 0, 0, 0)
+    val shortCorruptBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 1, 0, 0, 0, 0, 
0)
+    val intCorruptBE = Array[Byte](0xFF.toByte, 0xFE.toByte, 0xFD.toByte, 
0xFC.toByte, 1, 0, 0, 0)
+    val floatCorruptBE = Array[Byte](0xFF.toByte, 0x7F.toByte, 0xFD.toByte, 
0xFC.toByte, 1, 0, 0, 0)
+
+    val numFields = 5
+    val fieldsStartIdx = UnsafeRow.calculateBitSetWidthInBytes(numFields)
+    val sizeInBytes = fieldsStartIdx + (numFields * 8)
+    val data = new Array[Byte](sizeInBytes)
+    val row = new UnsafeRow(numFields)
+    row.pointTo(data, sizeInBytes)
+
+    // Set all bits of all fields to 1 to ensure we don't miss overwiting any 
fields later
+    row.setLong(0, 0xFFFFFFFFFFFFFFFFL)
+    row.setLong(1, 0xFFFFFFFFFFFFFFFFL)
+    row.setLong(2, 0xFFFFFFFFFFFFFFFFL)
+    row.setLong(3, 0xFFFFFFFFFFFFFFFFL)
+    row.setLong(4, 0xFFFFFFFFFFFFFFFFL)
+
+    // The PaddingProvider implementations get the full 8 bytes of the field 
using
+    // UnsafeRow.getLong(n) which is platform endianness dependent.
+    // When testing PaddingProviderBE on little endian platforms, the big 
endian byte arrays
+    // must be reversed so that UnsafeRow.getLong(n) returns the long value 
that would be seen
+    // on a big endian platform.
+    // The opposite is true when testing PaddingProviderLE on big endian 
platforms.
+    def overwrite(src: Array[Byte], fieldIdx: Int, reverse: Boolean): Unit = {
+      Array.copy(if (reverse) src.reverse else src, 0, data, fieldsStartIdx + 
(fieldIdx * 8), 8)
+    }
+
+    // Overwrite the row's data with raw little endian values
+    // reversing the byte order if testing on big endian platforms.
+    overwrite(boolLE, 0, nativeOrder() == BIG_ENDIAN)
+    overwrite(byteLE, 1, nativeOrder() == BIG_ENDIAN)
+    overwrite(shortLE, 2, nativeOrder() == BIG_ENDIAN)
+    overwrite(intLE, 3, nativeOrder() == BIG_ENDIAN)
+    overwrite(floatLE, 4, nativeOrder() == BIG_ENDIAN)
+
+    assert(UnsafeRowUtils.PaddingProviderLE.getPaddingBoolean(row, 0) == 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 1, 8) == 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 2, 16) == 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 3, 32) == 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 4, 32) == 0)
+
+    overwrite(boolCorruptLE, 0, nativeOrder() == BIG_ENDIAN)
+    overwrite(byteCorruptLE, 1, nativeOrder() == BIG_ENDIAN)
+    overwrite(shortCorruptLE, 2, nativeOrder() == BIG_ENDIAN)
+    overwrite(intCorruptLE, 3, nativeOrder() == BIG_ENDIAN)
+    overwrite(floatCorruptLE, 4, nativeOrder() == BIG_ENDIAN)
+
+    assert(UnsafeRowUtils.PaddingProviderLE.getPaddingBoolean(row, 0) != 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 1, 8) != 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 2, 16) != 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 3, 32) != 0)
+    assert(UnsafeRowUtils.PaddingProviderLE.getPadding(row, 4, 32) != 0)
+
+    // Overwrite the row's data with raw big endian values
+    // reversing the byte order if testing on little endian platforms.
+    overwrite(boolBE, 0, nativeOrder() != BIG_ENDIAN)
+    overwrite(byteBE, 1, nativeOrder() != BIG_ENDIAN)
+    overwrite(shortBE, 2, nativeOrder() != BIG_ENDIAN)
+    overwrite(intBE, 3, nativeOrder() != BIG_ENDIAN)
+    overwrite(floatBE, 4, nativeOrder() != BIG_ENDIAN)
+
+    assert(UnsafeRowUtils.PaddingProviderBE.getPaddingBoolean(row, 0) == 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 1, 8) == 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 2, 16) == 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 3, 32) == 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 4, 32) == 0)
+
+    overwrite(boolCorruptBE, 0, nativeOrder() != BIG_ENDIAN)
+    overwrite(byteCorruptBE, 1, nativeOrder() != BIG_ENDIAN)
+    overwrite(shortCorruptBE, 2, nativeOrder() != BIG_ENDIAN)
+    overwrite(intCorruptBE, 3, nativeOrder() != BIG_ENDIAN)
+    overwrite(floatCorruptBE, 4, nativeOrder() != BIG_ENDIAN)
+
+    assert(UnsafeRowUtils.PaddingProviderBE.getPaddingBoolean(row, 0) != 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 1, 8) != 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 2, 16) != 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 3, 32) != 0)
+    assert(UnsafeRowUtils.PaddingProviderBE.getPadding(row, 4, 32) != 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to