This is an automated email from the ASF dual-hosted git repository.
chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 039040085a feat(sort): add BINARY type support in sort operator (#4014)
039040085a is described below
commit 039040085ad98e3ab80c76cabf4f81275927f323
Author: carloea2 <[email protected]>
AuthorDate: Mon Nov 3 23:44:06 2025 -0800
feat(sort): add BINARY type support in sort operator (#4014)
### Summary
This PR adds support for sorting AttributeType.BINARY values in
StableMergeSortOpExec.
Binary values (`byte[]`) are now ordered using unsigned lexicographic
comparison, consistent with Java’s `Arrays.compareUnsigned`.
## Changes
1. Operator: Added sorting support for AttributeType.BINARY in
compareTypedNonNullValues in StableMergeSortOpExec.
2. Testing: Added three test cases in StableMergeSortOpExecSpec
## Use example
<img width="1669" height="815" alt="image"
src="https://github.com/user-attachments/assets/f627ebab-3cc1-42c5-8b28-d58274e10a32"
/>
Closes #4013
---------
Signed-off-by: carloea2 <[email protected]>
---
.../operator/sort/StableMergeSortOpExec.scala | 19 +++++-
.../operator/sort/StableMergeSortOpExecSpec.scala | 74 ++++++++++++++++++++++
2 files changed, 90 insertions(+), 3 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
index 06ee9ed6da..ba13e12b8f 100644
---
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
@@ -225,9 +225,17 @@ class StableMergeSortOpExec(descString: String) extends
OperatorExecutor {
/**
* Compare two non-null values using their attribute type.
*
- * For DOUBLE:
- * - Uses java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
- * - Callers if desired should define how NaN interacts with ASC/DESC and
null policy.
+ * Type semantics:
+ * - INTEGER, LONG: numeric ascending via Java primitive compares.
+ * - DOUBLE: java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
+ * - BOOLEAN: false < true.
+ * - TIMESTAMP: java.sql.Timestamp#compareTo.
+ * - STRING: String#compareTo (UTF-16, lexicographic).
+ * - BINARY: unsigned lexicographic order over byte arrays:
+ * - Compare byte-by-byte treating each as 0..255 (mask 0xff).
+ * - The first differing byte decides the order.
+ * - If all compared bytes are equal, the shorter array sorts first.
+ * - Example: [] < [0x00] < [0x00,0x00] < [0x00,0x01] < [0x7F] <
[0x80] < [0xFF].
*/
private def compareTypedNonNullValues(
leftValue: Any,
@@ -258,6 +266,11 @@ class StableMergeSortOpExec(descString: String) extends
OperatorExecutor {
.compareTo(rightValue.asInstanceOf[java.sql.Timestamp])
case AttributeType.STRING =>
leftValue.asInstanceOf[String].compareTo(rightValue.asInstanceOf[String])
+ case AttributeType.BINARY =>
+ java.util.Arrays.compareUnsigned(
+ leftValue.asInstanceOf[Array[Byte]],
+ rightValue.asInstanceOf[Array[Byte]]
+ )
case other =>
throw new IllegalStateException(s"Unsupported attribute type $other in
StableMergeSort")
}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
index 9eead41cf1..ecb38cfff4 100644
---
a/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
@@ -247,6 +247,30 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
assert(desc.map(_.getField[Boolean]("bool")) == List(true, true, false,
false))
}
+ it should "sort BINARY ascending (unsigned lexicographic) incl. empty and
high-bit bytes" in {
+ val schema = schemaOf("bin" -> AttributeType.BINARY)
+
+ val bytesEmpty = Array[Byte]() // []
+ val bytes00 = Array(0x00.toByte) // [00]
+ val bytes0000 = Array(0x00.toByte, 0x00.toByte) // [00,00]
+ val bytes0001 = Array(0x00.toByte, 0x01.toByte) // [00,01]
+ val bytes7F = Array(0x7f.toByte) // [7F]
+ val bytes80 = Array(0x80.toByte) // [80] (-128)
+ val bytesFF = Array(0xff.toByte) // [FF] (-1)
+
+ val inputTuples = List(bytes80, bytes0000, bytesEmpty, bytesFF, bytes0001,
bytes00, bytes7F)
+ .map(arr => tupleOf(schema, "bin" -> arr))
+
+ val sorted = runStableMergeSort(schema, inputTuples) { _.keys =
sortKeysBuffer(sortKey("bin")) }
+
+ val actualUnsigned = sorted.map(_.getField[Array[Byte]]("bin").toSeq.map(b
=> b & 0xff))
+ val expectedUnsigned =
+ List(bytesEmpty, bytes00, bytes0000, bytes0001, bytes7F, bytes80,
bytesFF)
+ .map(_.toSeq.map(b => b & 0xff))
+
+ assert(actualUnsigned == expectedUnsigned)
+ }
+
//
===========================================================================
// B. Floating-point & Null/NaN policy
//
===========================================================================
@@ -334,6 +358,30 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
)
}
+ it should "sort BINARY descending with nulls last and preserve stability for
equal byte arrays" in {
+ val schema = schemaOf("bin" -> AttributeType.BINARY, "id" ->
AttributeType.STRING)
+
+ val key00 = Array(0x00.toByte)
+ val keyFF = Array(0xff.toByte)
+
+ val inputTuples = List(
+ tupleOf(schema, "bin" -> keyFF, "id" -> "ff-1"),
+ tupleOf(schema, "bin" -> key00, "id" -> "00-1"),
+ tupleOf(
+ schema,
+ "bin" -> key00,
+ "id" -> "00-2"
+ ), // equal to previous; stability should keep order
+ tupleOf(schema, "bin" -> null, "id" -> "null-1")
+ )
+
+ val sorted = runStableMergeSort(schema, inputTuples) {
+ _.keys = sortKeysBuffer(sortKey("bin", SortPreference.DESC))
+ }
+
+ val idsInOrder = sorted.map(_.getField[String]("id"))
+ assert(idsInOrder == List("ff-1", "00-1", "00-2", "null-1"))
+ }
//
===========================================================================
// C. Multi-key semantics (lexicographic)
//
===========================================================================
@@ -475,6 +523,32 @@ class StableMergeSortOpExecSpec extends AnyFlatSpec {
)
}
+ it should "use INTEGER secondary key to break ties when primary BINARY keys
are equal" in {
+ val schema = schemaOf(
+ "bin" -> AttributeType.BINARY,
+ "score" -> AttributeType.INTEGER,
+ "label" -> AttributeType.STRING
+ )
+
+ val key00 = Array(0x00.toByte)
+ val key01 = Array(0x01.toByte)
+
+ val inputTuples = List(
+ tupleOf(schema, "bin" -> key01, "score" -> 1, "label" -> "01-score1"),
+ tupleOf(schema, "bin" -> key00, "score" -> 9, "label" -> "00-score9"),
+ tupleOf(schema, "bin" -> key01, "score" -> 2, "label" -> "01-score2")
+ )
+
+ val sorted = runStableMergeSort(schema, inputTuples) { desc =>
+ desc.keys = sortKeysBuffer(
+ sortKey("bin", SortPreference.ASC), // primary: binary ascending
+ sortKey("score", SortPreference.DESC) // secondary: integer descending
+ )
+ }
+
+ val labelsInOrder = sorted.map(_.getField[String]("label"))
+ assert(labelsInOrder == List("00-score9", "01-score2", "01-score1"))
+ }
//
===========================================================================
// D. Stability & operational behaviors
//
===========================================================================