This is an automated email from the ASF dual-hosted git repository.

chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 039040085a feat(sort): add BINARY type support in sort operator (#4014)
039040085a is described below

commit 039040085ad98e3ab80c76cabf4f81275927f323
Author: carloea2 <[email protected]>
AuthorDate: Mon Nov 3 23:44:06 2025 -0800

    feat(sort): add BINARY type support in sort operator (#4014)
    
    ### Summary
    This PR adds support for sorting AttributeType.BINARY values in
    StableMergeSortOpExec.
    
    Binary values (`byte[]`) are now ordered using unsigned lexicographic
    comparison, consistent with Java’s `Arrays.compareUnsigned`.
    
    ## Changes
    
    1. Operator: Added sorting support for AttributeType.BINARY in
    compareTypedNonNullValues in StableMergeSortOpExec.
    2. Testing: Added three test cases in StableMergeSortOpExecSpec
    
    ## Use example
    <img width="1669" height="815" alt="image"
    
src="https://github.com/user-attachments/assets/f627ebab-3cc1-42c5-8b28-d58274e10a32";
    />
    
    Closes #4013
    
    ---------
    
    Signed-off-by: carloea2 <[email protected]>
---
 .../operator/sort/StableMergeSortOpExec.scala      | 19 +++++-
 .../operator/sort/StableMergeSortOpExecSpec.scala  | 74 ++++++++++++++++++++++
 2 files changed, 90 insertions(+), 3 deletions(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
index 06ee9ed6da..ba13e12b8f 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
@@ -225,9 +225,17 @@ class StableMergeSortOpExec(descString: String) extends 
OperatorExecutor {
   /**
     * Compare two non-null values using their attribute type.
     *
-    * For DOUBLE:
-    *  - Uses java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
-    *  - Callers if desired should define how NaN interacts with ASC/DESC and 
null policy.
+    * Type semantics:
+    *  - INTEGER, LONG: numeric ascending via Java primitive compares.
+    *  - DOUBLE: java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
+    *  - BOOLEAN: false < true.
+    *  - TIMESTAMP: java.sql.Timestamp#compareTo.
+    *  - STRING: String#compareTo (UTF-16, lexicographic).
+    *  - BINARY: unsigned lexicographic order over byte arrays:
+    *        - Compare byte-by-byte treating each as 0..255 (mask 0xff).
+    *        - The first differing byte decides the order.
+    *        - If all compared bytes are equal, the shorter array sorts first.
+    *        - Example: [] < [0x00] < [0x00,0x00] < [0x00,0x01] < [0x7F] < 
[0x80] < [0xFF].
     */
   private def compareTypedNonNullValues(
       leftValue: Any,
@@ -258,6 +266,11 @@ class StableMergeSortOpExec(descString: String) extends 
OperatorExecutor {
           .compareTo(rightValue.asInstanceOf[java.sql.Timestamp])
       case AttributeType.STRING =>
         
leftValue.asInstanceOf[String].compareTo(rightValue.asInstanceOf[String])
+      case AttributeType.BINARY =>
+        java.util.Arrays.compareUnsigned(
+          leftValue.asInstanceOf[Array[Byte]],
+          rightValue.asInstanceOf[Array[Byte]]
+        )
       case other =>
         throw new IllegalStateException(s"Unsupported attribute type $other in 
StableMergeSort")
     }
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
index 9eead41cf1..ecb38cfff4 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
@@ -247,6 +247,30 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
     assert(desc.map(_.getField[Boolean]("bool")) == List(true, true, false, 
false))
   }
 
+  it should "sort BINARY ascending (unsigned lexicographic) incl. empty and 
high-bit bytes" in {
+    val schema = schemaOf("bin" -> AttributeType.BINARY)
+
+    val bytesEmpty = Array[Byte]() // []
+    val bytes00 = Array(0x00.toByte) // [00]
+    val bytes0000 = Array(0x00.toByte, 0x00.toByte) // [00,00]
+    val bytes0001 = Array(0x00.toByte, 0x01.toByte) // [00,01]
+    val bytes7F = Array(0x7f.toByte) // [7F]
+    val bytes80 = Array(0x80.toByte) // [80] (-128)
+    val bytesFF = Array(0xff.toByte) // [FF] (-1)
+
+    val inputTuples = List(bytes80, bytes0000, bytesEmpty, bytesFF, bytes0001, 
bytes00, bytes7F)
+      .map(arr => tupleOf(schema, "bin" -> arr))
+
+    val sorted = runStableMergeSort(schema, inputTuples) { _.keys = 
sortKeysBuffer(sortKey("bin")) }
+
+    val actualUnsigned = sorted.map(_.getField[Array[Byte]]("bin").toSeq.map(b 
=> b & 0xff))
+    val expectedUnsigned =
+      List(bytesEmpty, bytes00, bytes0000, bytes0001, bytes7F, bytes80, 
bytesFF)
+        .map(_.toSeq.map(b => b & 0xff))
+
+    assert(actualUnsigned == expectedUnsigned)
+  }
+
   // 
===========================================================================
   // B. Floating-point & Null/NaN policy
   // 
===========================================================================
@@ -334,6 +358,30 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
     )
   }
 
+  it should "sort BINARY descending with nulls last and preserve stability for 
equal byte arrays" in {
+    val schema = schemaOf("bin" -> AttributeType.BINARY, "id" -> 
AttributeType.STRING)
+
+    val key00 = Array(0x00.toByte)
+    val keyFF = Array(0xff.toByte)
+
+    val inputTuples = List(
+      tupleOf(schema, "bin" -> keyFF, "id" -> "ff-1"),
+      tupleOf(schema, "bin" -> key00, "id" -> "00-1"),
+      tupleOf(
+        schema,
+        "bin" -> key00,
+        "id" -> "00-2"
+      ), // equal to previous; stability should keep order
+      tupleOf(schema, "bin" -> null, "id" -> "null-1")
+    )
+
+    val sorted = runStableMergeSort(schema, inputTuples) {
+      _.keys = sortKeysBuffer(sortKey("bin", SortPreference.DESC))
+    }
+
+    val idsInOrder = sorted.map(_.getField[String]("id"))
+    assert(idsInOrder == List("ff-1", "00-1", "00-2", "null-1"))
+  }
   // 
===========================================================================
   // C. Multi-key semantics (lexicographic)
   // 
===========================================================================
@@ -475,6 +523,32 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
     )
   }
 
+  it should "use INTEGER secondary key to break ties when primary BINARY keys 
are equal" in {
+    val schema = schemaOf(
+      "bin" -> AttributeType.BINARY,
+      "score" -> AttributeType.INTEGER,
+      "label" -> AttributeType.STRING
+    )
+
+    val key00 = Array(0x00.toByte)
+    val key01 = Array(0x01.toByte)
+
+    val inputTuples = List(
+      tupleOf(schema, "bin" -> key01, "score" -> 1, "label" -> "01-score1"),
+      tupleOf(schema, "bin" -> key00, "score" -> 9, "label" -> "00-score9"),
+      tupleOf(schema, "bin" -> key01, "score" -> 2, "label" -> "01-score2")
+    )
+
+    val sorted = runStableMergeSort(schema, inputTuples) { desc =>
+      desc.keys = sortKeysBuffer(
+        sortKey("bin", SortPreference.ASC), // primary: binary ascending
+        sortKey("score", SortPreference.DESC) // secondary: integer descending
+      )
+    }
+
+    val labelsInOrder = sorted.map(_.getField[String]("label"))
+    assert(labelsInOrder == List("00-score9", "01-score2", "01-score1"))
+  }
   // 
===========================================================================
   // D. Stability & operational behaviors
   // 
===========================================================================

Reply via email to