This is an automated email from the ASF dual-hosted git repository.

chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 2780caa2b3 feat(operator): add Stable Incremental Merge Sort (Scala) 
(#3774)
2780caa2b3 is described below

commit 2780caa2b3fbf73767430b25f148b2b07b2a3801
Author: carloea2 <[email protected]>
AuthorDate: Fri Oct 17 10:27:16 2025 -0700

    feat(operator): add Stable Incremental Merge Sort (Scala) (#3774)
    
    **feat(operator): add Stable Merge Sort (Scala) with multi-key ordering,
    nulls handling, and offset/limit (Closes #3763)**
    
    ---
    
    ## Summary
    
    Adds a new **Stable Merge Sort** operator in Scala. Performs stable
    per-partition sort with multiple keys and post-sort offset/limit.
    
    **Closes #3763.**
    
    ### Highlights
    
    * New files:
    
      * `StableMergeSortOpDesc.scala` – logical desc + JSON schema.
    * `StableMergeSortOpExec.scala` – stable merge sort on buffered tuples.
      * `StableMergeSortOpExecSpec.scala` – tests.
    * Features:
    
    * Per-key options: `order: asc|desc`, `nulls: first|last`,
    `caseInsensitive` (STRING only).
      * Supported types: STRING, INTEGER, LONG, DOUBLE, BOOLEAN, TIMESTAMP.
      * Stable ordering preserved for equal keys.
      * Offset/limit applied **after** sorting.
    
    ### Tests
    
    * Asc/desc integers (stability).
    * Case-sensitive vs case-insensitive strings.
    * Nulls first/last.
    * Multi-key across mixed types.
    * Offset + limit.
    * Missing attribute / unsupported type errors.
    * Large input sanity.
    * Buffers until `onFinish`.
    
    ### Example config
    
    ```json
    {
      "keys": [
        { "attribute": "dept",  "order": "asc",  "nulls": "last" },
        { "attribute": "score", "order": "desc", "nulls": "last" },
        { "attribute": "name",  "order": "asc",  "nulls": "last", 
"caseInsensitive": true }
      ],
      "offset": 0,
      "limit": null
    }
    ```
    ---
    
    ---------
    
    Signed-off-by: carloea2 <[email protected]>
---
 .../org/apache/amber/operator/LogicalOp.scala      |   3 +-
 .../operator/sort/StableMergeSortOpDesc.scala      |  72 +++
 .../operator/sort/StableMergeSortOpExec.scala      | 264 ++++++++
 .../operator/sort/StableMergeSortOpExecSpec.scala  | 708 +++++++++++++++++++++
 .../src/assets/operator_images/StableMergeSort.png | Bin 0 -> 6808 bytes
 5 files changed, 1046 insertions(+), 1 deletion(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
index 209ac0e481..cf74190f06 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
@@ -61,7 +61,7 @@ import 
org.apache.amber.operator.reservoirsampling.ReservoirSamplingOpDesc
 import org.apache.amber.operator.sklearn._
 import org.apache.amber.operator.sklearn.training._
 import org.apache.amber.operator.sleep.SleepOpDesc
-import org.apache.amber.operator.sort.SortOpDesc
+import org.apache.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc}
 import org.apache.amber.operator.sortPartitions.SortPartitionsOpDesc
 import org.apache.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc
 import org.apache.amber.operator.source.apis.twitter.v2.{
@@ -242,6 +242,7 @@ trait StateTransferFunc
     new Type(value = classOf[ArrowSourceOpDesc], name = "ArrowSource"),
     new Type(value = classOf[MachineLearningScorerOpDesc], name = "Scorer"),
     new Type(value = classOf[SortOpDesc], name = "Sort"),
+    new Type(value = classOf[StableMergeSortOpDesc], name = "StableMergeSort"),
     new Type(value = classOf[SklearnLogisticRegressionOpDesc], name = 
"SklearnLogisticRegression"),
     new Type(
       value = classOf[SklearnLogisticRegressionCVOpDesc],
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpDesc.scala
new file mode 100644
index 0000000000..efecc5a8de
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpDesc.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.amber.core.executor.OpExecWithClassName
+import org.apache.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
+import org.apache.amber.operator.LogicalOp
+import org.apache.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.amber.util.JSONUtils.objectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * This operator performs a stable, per-partition sort using an incremental
+  * stack of sorted buckets and pairwise stable merges. The sort keys define
+  * the lexicographic order and per-key direction (ASC/DESC).
+  */
+//TODO(#3922): disallowing sorting on binary type
+class StableMergeSortOpDesc extends LogicalOp {
+
+  @JsonProperty(value = "keys", required = true)
+  @JsonSchemaTitle("Sort Keys")
+  @JsonPropertyDescription("List of attributes to sort by with ordering 
preferences")
+  var keys: ListBuffer[SortCriteriaUnit] = _
+
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ): PhysicalOp = {
+    PhysicalOp
+      .manyToOnePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName(
+          "org.apache.amber.operator.sort.StableMergeSortOpExec",
+          objectMapper.writeValueAsString(this)
+        )
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+  }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      "Stable Merge Sort",
+      "Stable per-partition sort with multi-key ordering (incremental stack of 
sorted buckets)",
+      OperatorGroupConstants.SORT_GROUP,
+      List(InputPort()),
+      List(OutputPort(blocking = true))
+    )
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
new file mode 100644
index 0000000000..06ee9ed6da
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import org.apache.amber.core.executor.OperatorExecutor
+import org.apache.amber.core.tuple.{AttributeType, Schema, Tuple, TupleLike}
+import org.apache.amber.util.JSONUtils.objectMapper
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Stable in-memory merge sort for a single input partition.
+  *
+  * Strategy:
+  *  - Buffer incoming tuples as size-1 sorted buckets.
+  *  - Maintain a stack of buckets where adjacent buckets never share the same 
length.
+  *  - On each push, perform "binary-carry" merges while the top two buckets 
have equal sizes.
+  *  - At finish, collapse the stack left-to-right. Merging is stable (left 
wins on ties).
+  *
+  * Null policy:
+  *  - Nulls are always ordered last, regardless of ascending/descending per 
key.
+  */
+class StableMergeSortOpExec(descString: String) extends OperatorExecutor {
+
+  private val desc: StableMergeSortOpDesc =
+    objectMapper.readValue(descString, classOf[StableMergeSortOpDesc])
+
+  private var inputSchema: Schema = _
+
+  /** Sort key resolved against the schema (index, data type, and direction). 
*/
+  private case class CompiledSortKey(
+      index: Int,
+      attributeType: AttributeType,
+      descending: Boolean
+  )
+
+  /** Lexicographic sort keys compiled once on first tuple. */
+  private var compiledSortKeys: Array[CompiledSortKey] = _
+
+  /** Stack of sorted buckets. Invariant: no two adjacent buckets have equal 
lengths. */
+  private var sortedBuckets: ArrayBuffer[ArrayBuffer[Tuple]] = _
+
+  /** Exposed for testing: current bucket sizes from bottom to top of the 
stack. */
+  private[sort] def debugBucketSizes: List[Int] =
+    if (sortedBuckets == null) Nil else sortedBuckets.filter(_ != 
null).map(_.size).toList
+
+  /** Initialize internal state. */
+  override def open(): Unit = {
+    sortedBuckets = ArrayBuffer.empty[ArrayBuffer[Tuple]]
+  }
+
+  /** Release internal buffers. */
+  override def close(): Unit = {
+    if (sortedBuckets != null) sortedBuckets.clear()
+  }
+
+  /**
+    * Ingest a tuple. Defers emission until onFinish.
+    *
+    * Schema compilation happens on the first tuple.
+    * Each tuple forms a size-1 sorted bucket that is pushed and possibly 
merged.
+    */
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+    if (inputSchema == null) {
+      inputSchema = tuple.getSchema
+      compiledSortKeys = compileSortKeys(inputSchema)
+    }
+    val sizeOneBucket = ArrayBuffer[Tuple](tuple)
+    pushBucketAndCombine(sizeOneBucket)
+    Iterator.empty
+  }
+
+  /**
+    * Emit all sorted tuples by collapsing the bucket stack left-to-right.
+    * Stability is preserved because merge prefers the left bucket on equality.
+    */
+  override def onFinish(port: Int): Iterator[TupleLike] = {
+    if (sortedBuckets.isEmpty) return Iterator.empty
+
+    var accumulator = sortedBuckets(0)
+    var bucketIdx = 1
+    while (bucketIdx < sortedBuckets.length) {
+      accumulator = mergeSortedBuckets(accumulator, sortedBuckets(bucketIdx))
+      bucketIdx += 1
+    }
+
+    sortedBuckets.clear()
+    sortedBuckets.append(accumulator)
+    accumulator.iterator
+  }
+
+  /**
+    * Resolve logical sort keys to schema indices and attribute types.
+    * Outputs an array of compiled sort keys used by [[compareBySortKeys]].
+    */
+  private def compileSortKeys(schema: Schema): Array[CompiledSortKey] = {
+    desc.keys.map { sortCriteria: SortCriteriaUnit =>
+      val name = sortCriteria.attributeName
+      val index = schema.getIndex(name)
+      val dataType = schema.getAttribute(name).getType
+      val isDescending = sortCriteria.sortPreference == SortPreference.DESC
+      CompiledSortKey(index, dataType, isDescending)
+    }.toArray
+  }
+
+  /**
+    * Push an already-sorted bucket and perform "binary-carry" merges while the
+    * top two buckets have equal sizes.
+    *
+    * Scope:
+    *  - Internal helper. Called by [[processTuple]] for size-1 buckets;
+    *
+    * Expected output:
+    *  - Updates the internal stack so that no two adjacent buckets have equal 
sizes.
+    *
+    * Limitations / possible issues:
+    *  - The given bucket must already be sorted by [[compareBySortKeys]].
+    *  - Stability relies on left-before-right merge order; do not reorder 
parameters.
+    *
+    * Complexity:
+    *  - Amortized O(1) per push; total O(n log n) over n tuples.
+    */
+  private[sort] def pushBucketAndCombine(newBucket: ArrayBuffer[Tuple]): Unit 
= {
+    sortedBuckets.append(newBucket)
+    // Merge while top two buckets are equal-sized; left-before-right 
preserves stability.
+    while (
+      sortedBuckets.length >= 2 &&
+      sortedBuckets(sortedBuckets.length - 1).size == 
sortedBuckets(sortedBuckets.length - 2).size
+    ) {
+      val right = sortedBuckets.remove(sortedBuckets.length - 1) // newer
+      val left = sortedBuckets.remove(sortedBuckets.length - 1) // older
+      val merged = mergeSortedBuckets(left, right)
+      sortedBuckets.append(merged)
+    }
+  }
+
+  /**
+    * Stable two-way merge of two buckets already sorted by 
[[compareBySortKeys]].
+    *
+    * Scope:
+    *  - Internal helper used during incremental carries and final collapse.
+    *
+    * Expected output:
+    *  - A new bucket with all elements of both inputs, globally sorted.
+    *
+    * Limitations / possible issues:
+    *  - Both inputs must be sorted with the same key config; behavior is 
undefined otherwise.
+    *  - Stability guarantee: if keys are equal, the element from the left 
bucket is emitted first.
+    *
+    * Complexity:
+    *  - O(left.size + right.size)
+    */
+  private[sort] def mergeSortedBuckets(
+      leftBucket: ArrayBuffer[Tuple],
+      rightBucket: ArrayBuffer[Tuple]
+  ): ArrayBuffer[Tuple] = {
+    val outMerged = new ArrayBuffer[Tuple](leftBucket.size + rightBucket.size)
+    var leftIndex = 0
+    var rightIndex = 0
+    while (leftIndex < leftBucket.size && rightIndex < rightBucket.size) {
+      if (compareBySortKeys(leftBucket(leftIndex), rightBucket(rightIndex)) <= 
0) {
+        outMerged += leftBucket(leftIndex); leftIndex += 1
+      } else {
+        outMerged += rightBucket(rightIndex); rightIndex += 1
+      }
+    }
+    while (leftIndex < leftBucket.size) { outMerged += leftBucket(leftIndex); 
leftIndex += 1 }
+    while (rightIndex < rightBucket.size) { outMerged += 
rightBucket(rightIndex); rightIndex += 1 }
+    outMerged
+  }
+
+  /**
+    * Lexicographic comparison of two tuples using the compiled sort keys.
+    *
+    * Semantics:
+    *  - Nulls are always ordered last, regardless of sort direction.
+    *  - For non-null values, comparison is type-aware (see 
[[compareTypedNonNullValues]]).
+    *  - If a key compares equal, evaluation proceeds to the next key.
+    *  - Descending reverses the sign of the base comparison.
+    *
+    * Limitations / possible issues:
+    *  - Requires [[compiledSortKeys]] to be initialized; called after the 
first tuple.
+    *  - For unsupported types, [[compareTypedNonNullValues]] throws 
IllegalStateException.
+    */
+  private def compareBySortKeys(left: Tuple, right: Tuple): Int = {
+    var keyIndex = 0
+    while (keyIndex < compiledSortKeys.length) {
+      val currentKey = compiledSortKeys(keyIndex)
+      val leftValue = left.getField[Any](currentKey.index)
+      val rightValue = right.getField[Any](currentKey.index)
+
+      // Null policy: ALWAYS last, regardless of ASC/DESC
+      if (leftValue == null || rightValue == null) {
+        if (leftValue == null && rightValue == null) {
+          keyIndex += 1
+        } else {
+          return if (leftValue == null) 1 else -1
+        }
+      } else {
+        val base = compareTypedNonNullValues(leftValue, rightValue, 
currentKey.attributeType)
+        if (base != 0) return if (currentKey.descending) -base else base
+        keyIndex += 1
+      }
+    }
+    0
+  }
+
+  /**
+    * Compare two non-null values using their attribute type.
+    *
+    * For DOUBLE:
+    *  - Uses java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
+    *  - Callers if desired should define how NaN interacts with ASC/DESC and 
null policy.
+    */
+  private def compareTypedNonNullValues(
+      leftValue: Any,
+      rightValue: Any,
+      attrType: AttributeType
+  ): Int =
+    attrType match {
+      case AttributeType.INTEGER =>
+        java.lang.Integer.compare(
+          leftValue.asInstanceOf[Number].intValue(),
+          rightValue.asInstanceOf[Number].intValue()
+        )
+      case AttributeType.LONG =>
+        java.lang.Long.compare(
+          leftValue.asInstanceOf[Number].longValue(),
+          rightValue.asInstanceOf[Number].longValue()
+        )
+      case AttributeType.DOUBLE =>
+        java.lang.Double.compare(
+          leftValue.asInstanceOf[Number].doubleValue(),
+          rightValue.asInstanceOf[Number].doubleValue()
+        )
+      case AttributeType.BOOLEAN =>
+        java.lang.Boolean.compare(leftValue.asInstanceOf[Boolean], 
rightValue.asInstanceOf[Boolean])
+      case AttributeType.TIMESTAMP =>
+        leftValue
+          .asInstanceOf[java.sql.Timestamp]
+          .compareTo(rightValue.asInstanceOf[java.sql.Timestamp])
+      case AttributeType.STRING =>
+        
leftValue.asInstanceOf[String].compareTo(rightValue.asInstanceOf[String])
+      case other =>
+        throw new IllegalStateException(s"Unsupported attribute type $other in 
StableMergeSort")
+    }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
new file mode 100644
index 0000000000..9eead41cf1
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import org.apache.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
+import org.apache.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.sql.Timestamp
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.jdk.CollectionConverters.IterableHasAsJava
+
+/**
+  * Integration and internal-behavior tests for [[StableMergeSortOpExec]].
+  *
+  * Scope & coverage:
+  *  - Single-key semantics across core types (BOOLEAN, INTEGER/LONG, DOUBLE, 
STRING, TIMESTAMP).
+  *  - Multi-key lexicographic ordering (mixed directions/types) with null/NaN 
handling.
+  *  - Stability guarantees (relative order preserved for equal keys) and 
pass-through when no keys.
+  *  - Incremental “bucket stack” invariants (binary-carry sizes; no adjacent 
equal sizes).
+  *  - Operational properties (buffering behavior, idempotent onFinish, scale 
sanity).
+  *  - Test hooks for internal merge logic (mergeSortedBuckets, 
pushBucketAndCombine).
+  *
+  * Null policy:
+  *  - Nulls are always ordered last, regardless of ASC/DESC.
+  *  - NaN participates as a non-null floating value per Double comparison 
semantics.
+  *
+  * Notes:
+  *  - Some tests rely on package-visible test hooks to validate internals 
deterministically.
+  */
+class StableMergeSortOpExecSpec extends AnyFlatSpec {
+
+  // 
===========================================================================
+  // Helpers
+  // 
===========================================================================
+
+  /** Build a Schema with (name, type) pairs, in-order. */
+  private def schemaOf(attributes: (String, AttributeType)*): Schema = {
+    attributes.foldLeft(Schema()) {
+      case (acc, (name, attrType)) => acc.add(new Attribute(name, attrType))
+    }
+  }
+
+  /**
+    * Construct a Tuple for the provided schema.
+    *
+    * @param values map-like varargs: "colName" -> value. Must provide every 
column.
+    * @throws NoSuchElementException if a provided key is not in the schema.
+    */
+  private def tupleOf(schema: Schema, values: (String, Any)*): Tuple = {
+    val valueMap = values.toMap
+    val builder = Tuple.builder(schema)
+    schema.getAttributeNames.asJava.forEach { name =>
+      builder.add(schema.getAttribute(name), valueMap(name))
+    }
+    builder.build()
+  }
+
+  /** Convenience builder for a single sort key with direction (ASC by 
default). */
+  private def sortKey(
+      attribute: String,
+      pref: SortPreference = SortPreference.ASC
+  ): SortCriteriaUnit = {
+    val criteria = new SortCriteriaUnit()
+    criteria.attributeName = attribute
+    criteria.sortPreference = pref
+    criteria
+  }
+
+  /** Convert varargs keys into the operator config buffer. */
+  private def sortKeysBuffer(keys: SortCriteriaUnit*): 
ListBuffer[SortCriteriaUnit] =
+    ListBuffer(keys: _*)
+
+  /**
+    * Run the operator on an in-memory sequence of tuples and capture all 
output.
+    * Output is only emitted at onFinish to preserve determinism.
+    */
+  private def runStableMergeSort(
+      schema: Schema,
+      tuples: Seq[Tuple]
+  )(configure: StableMergeSortOpDesc => Unit): List[Tuple] = {
+    val desc = new StableMergeSortOpDesc()
+    configure(desc)
+    val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+    exec.open()
+    tuples.foreach(t => exec.processTuple(t, 0))
+    val result = exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList
+    exec.close()
+    result
+  }
+
+  /** Internal test hook to read the current bucket sizes on the stack. */
+  private def getBucketSizes(exec: StableMergeSortOpExec): List[Int] = 
exec.debugBucketSizes
+
+  /** Decompose an integer into its set-bit powers of two (ascending).
+    * Used to check the binary-carry invariant.
+    */
+  private def binaryDecomposition(number: Int): List[Int] = {
+    var remaining = number
+    val powers = scala.collection.mutable.ListBuffer[Int]()
+    while (remaining > 0) {
+      val lowestSetBit = Integer.lowestOneBit(remaining)
+      powers += lowestSetBit
+      remaining -= lowestSetBit
+    }
+    powers.toList
+  }
+
+  // 
===========================================================================
+  // A. Single-key semantics
+  // 
===========================================================================
+
+  "StableMergeSortOpExec" should "sort integers ascending and preserve 
duplicate order" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER, "label" -> 
AttributeType.STRING)
+    val tuples = List(
+      tupleOf(schema, "value" -> 3, "label" -> "a"),
+      tupleOf(schema, "value" -> 1, "label" -> "first-1"),
+      tupleOf(schema, "value" -> 2, "label" -> "b"),
+      tupleOf(schema, "value" -> 1, "label" -> "first-2"),
+      tupleOf(schema, "value" -> 3, "label" -> "c")
+    )
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("value")) }
+    assert(result.map(_.getField[Int]("value")) == List(1, 1, 2, 3, 3))
+    val labelsForOnes =
+      result.filter(_.getField[Int]("value") == 
1).map(_.getField[String]("label"))
+    assert(labelsForOnes == List("first-1", "first-2"))
+  }
+
+  it should "sort integers descending while preserving stability" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER, "label" -> 
AttributeType.STRING)
+    val tuples = List(
+      tupleOf(schema, "value" -> 2, "label" -> "first"),
+      tupleOf(schema, "value" -> 2, "label" -> "second"),
+      tupleOf(schema, "value" -> 1, "label" -> "third"),
+      tupleOf(schema, "value" -> 3, "label" -> "fourth")
+    )
+    val result = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("value", SortPreference.DESC))
+    }
+    assert(result.map(_.getField[Int]("value")) == List(3, 2, 2, 1))
+    val labelsForTwos =
+      result.filter(_.getField[Int]("value") == 
2).map(_.getField[String]("label"))
+    assert(labelsForTwos == List("first", "second"))
+  }
+
+  it should "handle string ordering (case-sensitive)" in {
+    val schema = schemaOf("name" -> AttributeType.STRING)
+    val tuples = List(
+      tupleOf(schema, "name" -> "apple"),
+      tupleOf(schema, "name" -> "Banana"),
+      tupleOf(schema, "name" -> "banana"),
+      tupleOf(schema, "name" -> "APPLE")
+    )
+    val sorted = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("name", SortPreference.ASC))
+    }
+    assert(sorted.map(_.getField[String]("name")) == List("APPLE", "Banana", 
"apple", "banana"))
+  }
+
+  it should "order ASCII strings by Java compareTo (punctuation < digits < 
uppercase < lowercase)" in {
+    val schema = schemaOf("str" -> AttributeType.STRING)
+    val tuples = List("a", "A", "0", "~", "!").map(s => tupleOf(schema, "str" 
-> s))
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("str")) }
+    assert(result.map(_.getField[String]("str")) == List("!", "0", "A", "a", 
"~"))
+  }
+
+  it should "sort negatives and zeros correctly" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val tuples = List(0, -1, -10, 5, -3, 2).map(v => tupleOf(schema, "value" 
-> v))
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("value")) }
+    assert(result.map(_.getField[Int]("value")) == List(-10, -3, -1, 0, 2, 5))
+  }
+
+  it should "sort LONG values ascending" in {
+    val schema = schemaOf("id" -> AttributeType.LONG)
+    val tuples = List(5L, 1L, 3L, 9L, 0L).map(v => tupleOf(schema, "id" -> v))
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("id")) }
+    assert(result.map(_.getField[Long]("id")) == List(0L, 1L, 3L, 5L, 9L))
+  }
+
+  it should "sort TIMESTAMP ascending" in {
+    val schema = schemaOf("timestamp" -> AttributeType.TIMESTAMP)
+    val base = Timestamp.valueOf("2022-01-01 00:00:00")
+    val tuples = List(
+      new Timestamp(base.getTime + 4000),
+      new Timestamp(base.getTime + 1000),
+      new Timestamp(base.getTime + 3000),
+      new Timestamp(base.getTime + 2000)
+    ).map(ts => tupleOf(schema, "timestamp" -> ts))
+    val result = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("timestamp", SortPreference.ASC))
+    }
+    val times = result.map(_.getField[Timestamp]("timestamp").getTime)
+    assert(times == times.sorted)
+  }
+
+  it should "sort TIMESTAMP descending" in {
+    val schema = schemaOf("timestamp" -> AttributeType.TIMESTAMP)
+    val base = Timestamp.valueOf("2023-01-01 00:00:00")
+    val tuples = List(
+      new Timestamp(base.getTime + 3000),
+      base,
+      new Timestamp(base.getTime + 1000),
+      new Timestamp(base.getTime + 2000)
+    ).map(ts => tupleOf(schema, "timestamp" -> ts))
+    val result = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("timestamp", SortPreference.DESC))
+    }
+    val times = result.map(_.getField[Timestamp]("timestamp").getTime)
+    assert(times == times.sorted(Ordering.Long.reverse))
+  }
+
+  it should "treat numeric strings as strings (lexicographic ordering)" in {
+    val schema = schemaOf("str" -> AttributeType.STRING)
+    val tuples = List("2", "10", "1", "11", "20").map(s => tupleOf(schema, 
"str" -> s))
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("str")) }
+    assert(result.map(_.getField[String]("str")) == List("1", "10", "11", "2", 
"20"))
+  }
+
+  it should "sort BOOLEAN ascending (false < true) and descending" in {
+    val schema = schemaOf("bool" -> AttributeType.BOOLEAN)
+    val tuples = List(true, false, true, false).map(v => tupleOf(schema, 
"bool" -> v))
+    val asc = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("bool", SortPreference.ASC))
+    }
+    assert(asc.map(_.getField[Boolean]("bool")) == List(false, false, true, 
true))
+    val desc = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("bool", SortPreference.DESC))
+    }
+    assert(desc.map(_.getField[Boolean]("bool")) == List(true, true, false, 
false))
+  }
+
+  // 
===========================================================================
+  // B. Floating-point & Null/NaN policy
+  // 
===========================================================================
+
+  it should "sort DOUBLE values including -0.0, 0.0, infinities and NaN" in {
+    val schema = schemaOf("x" -> AttributeType.DOUBLE)
+    val tuples =
+      List(Double.NaN, Double.PositiveInfinity, 1.5, -0.0, 0.0, -3.2, 
Double.NegativeInfinity)
+        .map(v => tupleOf(schema, "x" -> v))
+    val result = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("x"))
+    }
+    val values = result.map(_.getField[Double]("x"))
+    assert(values.head == Double.NegativeInfinity)
+    assert(values(1) == -3.2)
+    assert(java.lang.Double.compare(values(2), -0.0) == 0)
+    assert(java.lang.Double.compare(values(3), 0.0) == 0)
+    assert(values(4) == 1.5)
+    assert(values(5) == Double.PositiveInfinity)
+    assert(java.lang.Double.isNaN(values(6)))
+  }
+
+  it should "place NaN before null when sorting DOUBLE ascending (nulls last 
policy)" in {
+    val schema = schemaOf("x" -> AttributeType.DOUBLE)
+    val tuples = List(
+      tupleOf(schema, "x" -> null),
+      tupleOf(schema, "x" -> Double.NaN),
+      tupleOf(schema, "x" -> Double.NegativeInfinity),
+      tupleOf(schema, "x" -> 1.0),
+      tupleOf(schema, "x" -> Double.PositiveInfinity),
+      tupleOf(schema, "x" -> null)
+    )
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("x")) }
+    val values = result.map(_.getField[java.lang.Double]("x"))
+    assert(values.take(4).forall(_ != null)) // first 4 are non-null
+    assert(values(0).isInfinite && values(0) == Double.NegativeInfinity)
+    assert(values(1) == 1.0)
+    assert(values(2).isInfinite && values(2) == Double.PositiveInfinity)
+    assert(java.lang.Double.isNaN(values(3)))
+    assert(values.drop(4).forall(_ == null))
+  }
+
+  it should "place nulls last regardless of ascending or descending" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER, "label" -> 
AttributeType.STRING)
+    val tuples = List(
+      tupleOf(schema, "value" -> null, "label" -> "null-1"),
+      tupleOf(schema, "value" -> 5, "label" -> "five"),
+      tupleOf(schema, "value" -> null, "label" -> "null-2"),
+      tupleOf(schema, "value" -> 3, "label" -> "three")
+    )
+    val asc = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("value", SortPreference.ASC))
+    }
+    assert(asc.map(_.getField[String]("label")) == List("three", "five", 
"null-1", "null-2"))
+
+    val desc = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("value", SortPreference.DESC))
+    }
+    assert(desc.map(_.getField[String]("label")) == List("five", "three", 
"null-1", "null-2"))
+  }
+
+  it should "order NaN highest on secondary DESC but still place nulls last" 
in {
+    val schema = schemaOf(
+      "group" -> AttributeType.STRING,
+      "score" -> AttributeType.DOUBLE,
+      "label" -> AttributeType.STRING
+    )
+    val tuples = List(
+      tupleOf(schema, "group" -> "A", "score" -> java.lang.Double.NaN, "label" 
-> "nan"),
+      tupleOf(schema, "group" -> "A", "score" -> Double.PositiveInfinity, 
"label" -> "pinf"),
+      tupleOf(schema, "group" -> "A", "score" -> 1.0, "label" -> "one"),
+      tupleOf(schema, "group" -> "A", "score" -> 0.0, "label" -> "zero"),
+      tupleOf(schema, "group" -> "A", "score" -> -1.0, "label" -> "neg"),
+      tupleOf(schema, "group" -> "A", "score" -> Double.NegativeInfinity, 
"label" -> "ninf"),
+      tupleOf(schema, "group" -> "A", "score" -> null, "label" -> "null-1"),
+      tupleOf(schema, "group" -> "A", "score" -> null, "label" -> "null-2")
+    )
+    val result = runStableMergeSort(schema, tuples) { desc =>
+      desc.keys =
+        sortKeysBuffer(sortKey("group", SortPreference.ASC), sortKey("score", 
SortPreference.DESC))
+    }
+    assert(
+      result.map(_.getField[String]("label")) ==
+        List("nan", "pinf", "one", "zero", "neg", "ninf", "null-1", "null-2")
+    )
+  }
+
+  // 
===========================================================================
+  // C. Multi-key semantics (lexicographic)
+  // 
===========================================================================
+
+  it should "support multi-key sorting with mixed attribute types" in {
+    val schema = schemaOf(
+      "dept" -> AttributeType.STRING,
+      "score" -> AttributeType.DOUBLE,
+      "name" -> AttributeType.STRING,
+      "hired" -> AttributeType.TIMESTAMP
+    )
+    val base = new Timestamp(Timestamp.valueOf("2020-01-01 00:00:00").getTime)
+    val tuples = List(
+      tupleOf(schema, "dept" -> "Sales", "score" -> 9.5, "name" -> "Alice", 
"hired" -> base),
+      tupleOf(
+        schema,
+        "dept" -> "Sales",
+        "score" -> 9.5,
+        "name" -> "Bob",
+        "hired" -> new Timestamp(base.getTime + 1000)
+      ),
+      tupleOf(
+        schema,
+        "dept" -> "Sales",
+        "score" -> 8.0,
+        "name" -> "Carol",
+        "hired" -> new Timestamp(base.getTime + 2000)
+      ),
+      tupleOf(
+        schema,
+        "dept" -> "Engineering",
+        "score" -> 9.5,
+        "name" -> "Dave",
+        "hired" -> new Timestamp(base.getTime + 3000)
+      ),
+      tupleOf(
+        schema,
+        "dept" -> null,
+        "score" -> 9.5,
+        "name" -> "Eve",
+        "hired" -> new Timestamp(base.getTime + 4000)
+      )
+    )
+    val result = runStableMergeSort(schema, tuples) { desc =>
+      desc.keys = sortKeysBuffer(
+        sortKey("dept", SortPreference.ASC),
+        sortKey("score", SortPreference.DESC),
+        sortKey("name", SortPreference.ASC)
+      )
+    }
+    assert(result.map(_.getField[String]("name")) == List("Dave", "Alice", 
"Bob", "Carol", "Eve"))
+  }
+
+  it should "handle multi-key with descending primary and ascending secondary" 
in {
+    val schema = schemaOf(
+      "major" -> AttributeType.INTEGER,
+      "minor" -> AttributeType.INTEGER,
+      "idx" -> AttributeType.INTEGER
+    )
+    val tuples = List(
+      (1, 9, 0),
+      (1, 1, 1),
+      (2, 5, 2),
+      (2, 3, 3),
+      (1, 1, 4),
+      (3, 0, 5),
+      (3, 2, 6)
+    ).map { case (ma, mi, i) => tupleOf(schema, "major" -> ma, "minor" -> mi, 
"idx" -> i) }
+    val result = runStableMergeSort(schema, tuples) { desc =>
+      desc.keys =
+        sortKeysBuffer(sortKey("major", SortPreference.DESC), sortKey("minor", 
SortPreference.ASC))
+    }
+    val pairs = result.map(t => (t.getField[Int]("major"), 
t.getField[Int]("minor")))
+    assert(pairs == List((3, 0), (3, 2), (2, 3), (2, 5), (1, 1), (1, 1), (1, 
9)))
+    val idxFor11 = result
+      .filter(t => t.getField[Int]("major") == 1 && t.getField[Int]("minor") 
== 1)
+      .map(_.getField[Int]("idx"))
+    assert(idxFor11 == List(1, 4))
+  }
+
+  it should "use the third key as a tiebreaker (ASC, ASC, then DESC)" in {
+    val schema = schemaOf(
+      "keyA" -> AttributeType.INTEGER,
+      "keyB" -> AttributeType.INTEGER,
+      "keyC" -> AttributeType.INTEGER,
+      "id" -> AttributeType.STRING
+    )
+    val tuples = List(
+      (1, 1, 1, "x1"),
+      (1, 1, 3, "x3"),
+      (1, 1, 2, "x2"),
+      (1, 0, 9, "y9")
+    ).map {
+      case (a, b, c, id) => tupleOf(schema, "keyA" -> a, "keyB" -> b, "keyC" 
-> c, "id" -> id)
+    }
+    val result = runStableMergeSort(schema, tuples) {
+      _.keys =
+        sortKeysBuffer(sortKey("keyA"), sortKey("keyB"), sortKey("keyC", 
SortPreference.DESC))
+    }
+    assert(result.map(_.getField[String]("id")) == List("y9", "x3", "x2", 
"x1"))
+  }
+
+  it should "place nulls last across multiple keys (primary ASC, secondary 
DESC)" in {
+    val schema = schemaOf("keyA" -> AttributeType.STRING, "keyB" -> 
AttributeType.INTEGER)
+    val tuples = List(
+      ("x", 2),
+      (null, 1),
+      ("x", 1),
+      (null, 5),
+      ("a", 9),
+      ("a", 2)
+    ).map { case (s, i) => tupleOf(schema, "keyA" -> s, "keyB" -> i) }
+    val result = runStableMergeSort(schema, tuples) { desc =>
+      desc.keys =
+        sortKeysBuffer(sortKey("keyA", SortPreference.ASC), sortKey("keyB", 
SortPreference.DESC))
+    }
+    val out = result.map(t => (t.getField[String]("keyA"), 
t.getField[Int]("keyB")))
+    assert(out == List(("a", 9), ("a", 2), ("x", 2), ("x", 1), (null, 5), 
(null, 1)))
+  }
+
+  it should "when primary keys are both null, fall back to secondary ASC 
(nulls still after non-nulls)" in {
+    val schema = schemaOf(
+      "keyA" -> AttributeType.STRING,
+      "keyB" -> AttributeType.INTEGER,
+      "id" -> AttributeType.STRING
+    )
+    val tuples = List(
+      tupleOf(schema, "keyA" -> "A", "keyB" -> 2, "id" -> "non-null-a"),
+      tupleOf(schema, "keyA" -> null, "keyB" -> 5, "id" -> "null-a-5"),
+      tupleOf(schema, "keyA" -> null, "keyB" -> 1, "id" -> "null-a-1"),
+      tupleOf(schema, "keyA" -> "B", "keyB" -> 9, "id" -> "non-null-b")
+    )
+    val result = runStableMergeSort(schema, tuples) {
+      _.keys = sortKeysBuffer(sortKey("keyA"), sortKey("keyB"))
+    }
+    assert(
+      result
+        .map(_.getField[String]("id")) == List("non-null-a", "non-null-b", 
"null-a-1", "null-a-5")
+    )
+  }
+
+  // 
===========================================================================
+  // D. Stability & operational behaviors
+  // 
===========================================================================
+
+  it should "preserve original order among tuples with equal keys" in {
+    val schema = schemaOf("key" -> AttributeType.INTEGER, "index" -> 
AttributeType.INTEGER)
+    val tuples = (0 until 100).map(i => tupleOf(schema, "key" -> (i % 5), 
"index" -> i))
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("key")) }
+    val grouped = result.groupBy(_.getField[Int]("key")).values
+    grouped.foreach { group =>
+      val indices = group.map(_.getField[Int]("index"))
+      assert(indices == indices.sorted)
+    }
+  }
+
+  it should "act as a stable pass-through when keys are empty" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER, "label" -> 
AttributeType.STRING)
+    val tuples = List(3, 1, 4, 1, 5, 9).zipWithIndex
+      .map { case (v, i) => tupleOf(schema, "value" -> v, "label" -> 
s"row-$i") }
+    val result = runStableMergeSort(schema, tuples) { desc =>
+      desc.keys = ListBuffer.empty[SortCriteriaUnit]
+    }
+    assert(
+      result.map(_.getField[String]("label")) ==
+        List("row-0", "row-1", "row-2", "row-3", "row-4", "row-5")
+    )
+  }
+
+  it should "buffer tuples until onFinish is called" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val tuple = tupleOf(schema, "value" -> 2)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("value"))
+    val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+    exec.open()
+    val immediate = exec.processTuple(tuple, 0)
+    assert(immediate.isEmpty)
+    val result = exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList
+    assert(result.map(_.getField[Int]("value")) == List(2))
+    exec.close()
+  }
+
+  it should "return empty for empty input" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val result = runStableMergeSort(schema, Seq.empty) { _.keys = 
sortKeysBuffer(sortKey("value")) }
+    assert(result.isEmpty)
+  }
+
+  it should "handle single element input" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val result = runStableMergeSort(schema, Seq(tupleOf(schema, "value" -> 
42))) {
+      _.keys = sortKeysBuffer(sortKey("value"))
+    }
+    assert(result.map(_.getField[Int]("value")) == List(42))
+  }
+
+  it should "sort large inputs efficiently (sanity on boundaries)" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER, "label" -> 
AttributeType.STRING)
+    val tuples = (50000 to 1 by -1).map(i => tupleOf(schema, "value" -> i, 
"label" -> s"row-$i"))
+    val result = runStableMergeSort(schema, tuples) { _.keys = 
sortKeysBuffer(sortKey("value")) }
+    assert(result.head.getField[Int]("value") == 1)
+    assert(result(1).getField[Int]("value") == 2)
+    assert(result.takeRight(2).map(_.getField[Int]("value")) == List(49999, 
50000))
+  }
+
+  // 
===========================================================================
+  // E. Incremental bucket invariants (binary-carry & no-adjacent-equal)
+  // 
===========================================================================
+
+  it should "merge incrementally: bucket sizes match binary decomposition 
after each push" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("value"))
+    val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+    exec.open()
+
+    val totalCount = 64
+    for (index <- (totalCount - 1) to 0 by -1) {
+      exec.processTuple(tupleOf(schema, "value" -> index), 0)
+      val sizes = getBucketSizes(exec).sorted
+      assert(sizes == binaryDecomposition(totalCount - index))
+    }
+
+    exec.close()
+  }
+
+  it should "maintain bucket-stack invariant (no adjacent equal sizes) after 
each insertion" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("value"))
+    val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+    exec.open()
+
+    val totalCount = 200
+    val stream = (0 until totalCount by 2) ++ (1 until totalCount by 2)
+    stream.foreach { index =>
+      exec.processTuple(tupleOf(schema, "value" -> (totalCount - 1 - index)), 
0)
+      val sizes = getBucketSizes(exec)
+      sizes.sliding(2).foreach { pair =>
+        if (pair.length == 2) assert(pair.head != pair.last)
+      }
+    }
+
+    exec.close()
+  }
+
+  it should "form expected bucket sizes at milestones (1,2,3,4,7,8,15,16)" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("value"))
+    val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+    exec.open()
+
+    val inputSequence = (100 to 1 by -1).map(i => tupleOf(schema, "value" -> 
i))
+    val milestones = Set(1, 2, 3, 4, 7, 8, 15, 16)
+    var pushed = 0
+    inputSequence.foreach { t =>
+      exec.processTuple(t, 0); pushed += 1
+      if (milestones.contains(pushed)) {
+        val sizes = getBucketSizes(exec).sorted
+        assert(sizes == binaryDecomposition(pushed))
+      }
+    }
+
+    exec.close()
+  }
+
+  // 
===========================================================================
+  // F. Internal hooks — merge behavior
+  // 
===========================================================================
+
+  "mergeSortedBuckets" should "be stable: left bucket wins on equal keys" in {
+    val schema = schemaOf("key" -> AttributeType.INTEGER, "id" -> 
AttributeType.STRING)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("key"))
+    val exec = new 
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+
+    // Seed to resolve schema/keys once.
+    exec.processTuple(tupleOf(schema, "key" -> 0, "id" -> "seed"), 0)
+
+    val left = ArrayBuffer(
+      tupleOf(schema, "key" -> 1, "id" -> "L1"),
+      tupleOf(schema, "key" -> 2, "id" -> "L2")
+    )
+    val right = ArrayBuffer(
+      tupleOf(schema, "key" -> 1, "id" -> "R1"),
+      tupleOf(schema, "key" -> 3, "id" -> "R3")
+    )
+
+    val merged = exec.mergeSortedBuckets(left, right)
+    val ids = merged.map(_.getField[String]("id")).toList
+    assert(ids == List("L1", "R1", "L2", "R3"))
+    exec.close()
+  }
+
+  "mergeSortedBuckets" should "handle empty left bucket" in {
+    val schema = schemaOf("key" -> AttributeType.INTEGER, "id" -> 
AttributeType.STRING)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("key"))
+    val exec = new 
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+    exec.processTuple(tupleOf(schema, "key" -> 0, "id" -> "seed"), 0) // seed 
keys
+
+    val left = ArrayBuffer.empty[Tuple]
+    val right = ArrayBuffer(
+      tupleOf(schema, "key" -> 1, "id" -> "r1"),
+      tupleOf(schema, "key" -> 2, "id" -> "r2")
+    )
+    val merged = exec.mergeSortedBuckets(left, right)
+    assert(merged.map(_.getField[String]("id")).toList == List("r1", "r2"))
+    exec.close()
+  }
+
+  "mergeSortedBuckets" should "handle empty right bucket" in {
+    val schema = schemaOf("key" -> AttributeType.INTEGER, "id" -> 
AttributeType.STRING)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("key"))
+    val exec = new 
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+    exec.processTuple(tupleOf(schema, "key" -> 0, "id" -> "seed"), 0)
+
+    val left = ArrayBuffer(
+      tupleOf(schema, "key" -> 1, "id" -> "l1"),
+      tupleOf(schema, "key" -> 2, "id" -> "l2")
+    )
+    val right = ArrayBuffer.empty[Tuple]
+    val merged = exec.mergeSortedBuckets(left, right)
+    assert(merged.map(_.getField[String]("id")).toList == List("l1", "l2"))
+    exec.close()
+  }
+
+  // 
===========================================================================
+  // G. Internal hooks — push/finish/idempotence & schema errors
+  // 
===========================================================================
+
+  "pushBucketAndCombine" should "merge two size-2 buckets into size-4 on push 
(with existing size-1 seed)" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("value"))
+    val exec = new 
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+
+    // seed to compile keys -> results in one size-1 bucket in the stack
+    exec.processTuple(tupleOf(schema, "value" -> 0), 0)
+
+    // two pre-sorted buckets of size 2
+    val bucket1 = ArrayBuffer(tupleOf(schema, "value" -> 1), tupleOf(schema, 
"value" -> 3))
+    val bucket2 = ArrayBuffer(tupleOf(schema, "value" -> 2), tupleOf(schema, 
"value" -> 4))
+
+    exec.pushBucketAndCombine(bucket1) // sizes now [1,2]
+    exec.pushBucketAndCombine(bucket2) // equal top [2,2] => merged to 4; 
sizes [1,4]
+
+    val sizes = getBucketSizes(exec)
+    assert(sizes == List(1, 4))
+    exec.close()
+  }
+
+  it should "return the same sorted output if onFinish is called twice in a 
row" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("value"))
+    val exec = new 
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+    List(3, 1, 2).foreach(i => exec.processTuple(tupleOf(schema, "value" -> 
i), 0))
+
+    val first = 
exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList.map(_.getField[Int]("value"))
+    val second = 
exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList.map(_.getField[Int]("value"))
+    assert(first == List(1, 2, 3))
+    assert(second == List(1, 2, 3))
+    exec.close()
+  }
+
+  it should "have processTuple always return empty iterators until finish" in {
+    val schema = schemaOf("value" -> AttributeType.INTEGER)
+    val desc = new StableMergeSortOpDesc(); desc.keys = 
sortKeysBuffer(sortKey("value"))
+    val exec = new 
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+    val immediates = (10 to 1 by -1).map(i => 
exec.processTuple(tupleOf(schema, "value" -> i), 0))
+    assert(immediates.forall(_.isEmpty))
+    val out = 
exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList.map(_.getField[Int]("value"))
+    assert(out == (1 to 10).toList)
+    exec.close()
+  }
+
+}
diff --git a/frontend/src/assets/operator_images/StableMergeSort.png 
b/frontend/src/assets/operator_images/StableMergeSort.png
new file mode 100644
index 0000000000..bf9ec5632d
Binary files /dev/null and 
b/frontend/src/assets/operator_images/StableMergeSort.png differ

Reply via email to