This is an automated email from the ASF dual-hosted git repository.
chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 2780caa2b3 feat(operator): add Stable Incremental Merge Sort (Scala)
(#3774)
2780caa2b3 is described below
commit 2780caa2b3fbf73767430b25f148b2b07b2a3801
Author: carloea2 <[email protected]>
AuthorDate: Fri Oct 17 10:27:16 2025 -0700
feat(operator): add Stable Incremental Merge Sort (Scala) (#3774)
**feat(operator): add Stable Merge Sort (Scala) with multi-key ordering,
nulls handling, and offset/limit (Closes #3763)**
---
## Summary
Adds a new **Stable Merge Sort** operator in Scala. Performs stable
per-partition sort with multiple keys and post-sort offset/limit.
**Closes #3763.**
### Highlights
* New files:
* `StableMergeSortOpDesc.scala` – logical desc + JSON schema.
* `StableMergeSortOpExec.scala` – stable merge sort on buffered tuples.
* `StableMergeSortOpExecSpec.scala` – tests.
* Features:
* Per-key options: `order: asc|desc`, `nulls: first|last`,
`caseInsensitive` (STRING only).
* Supported types: STRING, INTEGER, LONG, DOUBLE, BOOLEAN, TIMESTAMP.
* Stable ordering preserved for equal keys.
* Offset/limit applied **after** sorting.
### Tests
* Asc/desc integers (stability).
* Case-sensitive vs case-insensitive strings.
* Nulls first/last.
* Multi-key across mixed types.
* Offset + limit.
* Missing attribute / unsupported type errors.
* Large input sanity.
* Buffers until `onFinish`.
### Example config
```json
{
"keys": [
{ "attribute": "dept", "order": "asc", "nulls": "last" },
{ "attribute": "score", "order": "desc", "nulls": "last" },
{ "attribute": "name", "order": "asc", "nulls": "last",
"caseInsensitive": true }
],
"offset": 0,
"limit": null
}
```
---
---------
Signed-off-by: carloea2 <[email protected]>
---
.../org/apache/amber/operator/LogicalOp.scala | 3 +-
.../operator/sort/StableMergeSortOpDesc.scala | 72 +++
.../operator/sort/StableMergeSortOpExec.scala | 264 ++++++++
.../operator/sort/StableMergeSortOpExecSpec.scala | 708 +++++++++++++++++++++
.../src/assets/operator_images/StableMergeSort.png | Bin 0 -> 6808 bytes
5 files changed, 1046 insertions(+), 1 deletion(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
index 209ac0e481..cf74190f06 100644
---
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
@@ -61,7 +61,7 @@ import
org.apache.amber.operator.reservoirsampling.ReservoirSamplingOpDesc
import org.apache.amber.operator.sklearn._
import org.apache.amber.operator.sklearn.training._
import org.apache.amber.operator.sleep.SleepOpDesc
-import org.apache.amber.operator.sort.SortOpDesc
+import org.apache.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc}
import org.apache.amber.operator.sortPartitions.SortPartitionsOpDesc
import org.apache.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc
import org.apache.amber.operator.source.apis.twitter.v2.{
@@ -242,6 +242,7 @@ trait StateTransferFunc
new Type(value = classOf[ArrowSourceOpDesc], name = "ArrowSource"),
new Type(value = classOf[MachineLearningScorerOpDesc], name = "Scorer"),
new Type(value = classOf[SortOpDesc], name = "Sort"),
+ new Type(value = classOf[StableMergeSortOpDesc], name = "StableMergeSort"),
new Type(value = classOf[SklearnLogisticRegressionOpDesc], name =
"SklearnLogisticRegression"),
new Type(
value = classOf[SklearnLogisticRegressionCVOpDesc],
diff --git
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpDesc.scala
new file mode 100644
index 0000000000..efecc5a8de
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpDesc.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.amber.core.executor.OpExecWithClassName
+import org.apache.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
+import org.apache.amber.operator.LogicalOp
+import org.apache.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import org.apache.amber.util.JSONUtils.objectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * This operator performs a stable, per-partition sort using an incremental
+ * stack of sorted buckets and pairwise stable merges. The sort keys define
+ * the lexicographic order and per-key direction (ASC/DESC).
+ */
+//TODO(#3922): disallowing sorting on binary type
+class StableMergeSortOpDesc extends LogicalOp {
+
+ @JsonProperty(value = "keys", required = true)
+ @JsonSchemaTitle("Sort Keys")
+ @JsonPropertyDescription("List of attributes to sort by with ordering
preferences")
+ var keys: ListBuffer[SortCriteriaUnit] = _
+
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = {
+ PhysicalOp
+ .manyToOnePhysicalOp(
+ workflowId,
+ executionId,
+ operatorIdentifier,
+ OpExecWithClassName(
+ "org.apache.amber.operator.sort.StableMergeSortOpExec",
+ objectMapper.writeValueAsString(this)
+ )
+ )
+ .withInputPorts(operatorInfo.inputPorts)
+ .withOutputPorts(operatorInfo.outputPorts)
+ }
+
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ "Stable Merge Sort",
+ "Stable per-partition sort with multi-key ordering (incremental stack of
sorted buckets)",
+ OperatorGroupConstants.SORT_GROUP,
+ List(InputPort()),
+ List(OutputPort(blocking = true))
+ )
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
new file mode 100644
index 0000000000..06ee9ed6da
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import org.apache.amber.core.executor.OperatorExecutor
+import org.apache.amber.core.tuple.{AttributeType, Schema, Tuple, TupleLike}
+import org.apache.amber.util.JSONUtils.objectMapper
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Stable in-memory merge sort for a single input partition.
+ *
+ * Strategy:
+ * - Buffer incoming tuples as size-1 sorted buckets.
+ * - Maintain a stack of buckets where adjacent buckets never share the same
length.
+ * - On each push, perform "binary-carry" merges while the top two buckets
have equal sizes.
+ * - At finish, collapse the stack left-to-right. Merging is stable (left
wins on ties).
+ *
+ * Null policy:
+ * - Nulls are always ordered last, regardless of ascending/descending per
key.
+ */
+class StableMergeSortOpExec(descString: String) extends OperatorExecutor {
+
+ private val desc: StableMergeSortOpDesc =
+ objectMapper.readValue(descString, classOf[StableMergeSortOpDesc])
+
+ private var inputSchema: Schema = _
+
+ /** Sort key resolved against the schema (index, data type, and direction).
*/
+ private case class CompiledSortKey(
+ index: Int,
+ attributeType: AttributeType,
+ descending: Boolean
+ )
+
+ /** Lexicographic sort keys compiled once on first tuple. */
+ private var compiledSortKeys: Array[CompiledSortKey] = _
+
+ /** Stack of sorted buckets. Invariant: no two adjacent buckets have equal
lengths. */
+ private var sortedBuckets: ArrayBuffer[ArrayBuffer[Tuple]] = _
+
+ /** Exposed for testing: current bucket sizes from bottom to top of the
stack. */
+ private[sort] def debugBucketSizes: List[Int] =
+ if (sortedBuckets == null) Nil else sortedBuckets.filter(_ !=
null).map(_.size).toList
+
+ /** Initialize internal state. */
+ override def open(): Unit = {
+ sortedBuckets = ArrayBuffer.empty[ArrayBuffer[Tuple]]
+ }
+
+ /** Release internal buffers. */
+ override def close(): Unit = {
+ if (sortedBuckets != null) sortedBuckets.clear()
+ }
+
+ /**
+ * Ingest a tuple. Defers emission until onFinish.
+ *
+ * Schema compilation happens on the first tuple.
+ * Each tuple forms a size-1 sorted bucket that is pushed and possibly
merged.
+ */
+ override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+ if (inputSchema == null) {
+ inputSchema = tuple.getSchema
+ compiledSortKeys = compileSortKeys(inputSchema)
+ }
+ val sizeOneBucket = ArrayBuffer[Tuple](tuple)
+ pushBucketAndCombine(sizeOneBucket)
+ Iterator.empty
+ }
+
+ /**
+ * Emit all sorted tuples by collapsing the bucket stack left-to-right.
+ * Stability is preserved because merge prefers the left bucket on equality.
+ */
+ override def onFinish(port: Int): Iterator[TupleLike] = {
+ if (sortedBuckets.isEmpty) return Iterator.empty
+
+ var accumulator = sortedBuckets(0)
+ var bucketIdx = 1
+ while (bucketIdx < sortedBuckets.length) {
+ accumulator = mergeSortedBuckets(accumulator, sortedBuckets(bucketIdx))
+ bucketIdx += 1
+ }
+
+ sortedBuckets.clear()
+ sortedBuckets.append(accumulator)
+ accumulator.iterator
+ }
+
+ /**
+ * Resolve logical sort keys to schema indices and attribute types.
+ * Outputs an array of compiled sort keys used by [[compareBySortKeys]].
+ */
+ private def compileSortKeys(schema: Schema): Array[CompiledSortKey] = {
+ desc.keys.map { sortCriteria: SortCriteriaUnit =>
+ val name = sortCriteria.attributeName
+ val index = schema.getIndex(name)
+ val dataType = schema.getAttribute(name).getType
+ val isDescending = sortCriteria.sortPreference == SortPreference.DESC
+ CompiledSortKey(index, dataType, isDescending)
+ }.toArray
+ }
+
+ /**
+ * Push an already-sorted bucket and perform "binary-carry" merges while the
+ * top two buckets have equal sizes.
+ *
+ * Scope:
+ * - Internal helper. Called by [[processTuple]] for size-1 buckets;
+ *
+ * Expected output:
+ * - Updates the internal stack so that no two adjacent buckets have equal
sizes.
+ *
+ * Limitations / possible issues:
+ * - The given bucket must already be sorted by [[compareBySortKeys]].
+ * - Stability relies on left-before-right merge order; do not reorder
parameters.
+ *
+ * Complexity:
+ * - Amortized O(1) per push; total O(n log n) over n tuples.
+ */
+ private[sort] def pushBucketAndCombine(newBucket: ArrayBuffer[Tuple]): Unit
= {
+ sortedBuckets.append(newBucket)
+ // Merge while top two buckets are equal-sized; left-before-right
preserves stability.
+ while (
+ sortedBuckets.length >= 2 &&
+ sortedBuckets(sortedBuckets.length - 1).size ==
sortedBuckets(sortedBuckets.length - 2).size
+ ) {
+ val right = sortedBuckets.remove(sortedBuckets.length - 1) // newer
+ val left = sortedBuckets.remove(sortedBuckets.length - 1) // older
+ val merged = mergeSortedBuckets(left, right)
+ sortedBuckets.append(merged)
+ }
+ }
+
+ /**
+ * Stable two-way merge of two buckets already sorted by
[[compareBySortKeys]].
+ *
+ * Scope:
+ * - Internal helper used during incremental carries and final collapse.
+ *
+ * Expected output:
+ * - A new bucket with all elements of both inputs, globally sorted.
+ *
+ * Limitations / possible issues:
+ * - Both inputs must be sorted with the same key config; behavior is
undefined otherwise.
+ * - Stability guarantee: if keys are equal, the element from the left
bucket is emitted first.
+ *
+ * Complexity:
+ * - O(left.size + right.size)
+ */
+ private[sort] def mergeSortedBuckets(
+ leftBucket: ArrayBuffer[Tuple],
+ rightBucket: ArrayBuffer[Tuple]
+ ): ArrayBuffer[Tuple] = {
+ val outMerged = new ArrayBuffer[Tuple](leftBucket.size + rightBucket.size)
+ var leftIndex = 0
+ var rightIndex = 0
+ while (leftIndex < leftBucket.size && rightIndex < rightBucket.size) {
+ if (compareBySortKeys(leftBucket(leftIndex), rightBucket(rightIndex)) <=
0) {
+ outMerged += leftBucket(leftIndex); leftIndex += 1
+ } else {
+ outMerged += rightBucket(rightIndex); rightIndex += 1
+ }
+ }
+ while (leftIndex < leftBucket.size) { outMerged += leftBucket(leftIndex);
leftIndex += 1 }
+ while (rightIndex < rightBucket.size) { outMerged +=
rightBucket(rightIndex); rightIndex += 1 }
+ outMerged
+ }
+
+ /**
+ * Lexicographic comparison of two tuples using the compiled sort keys.
+ *
+ * Semantics:
+ * - Nulls are always ordered last, regardless of sort direction.
+ * - For non-null values, comparison is type-aware (see
[[compareTypedNonNullValues]]).
+ * - If a key compares equal, evaluation proceeds to the next key.
+ * - Descending reverses the sign of the base comparison.
+ *
+ * Limitations / possible issues:
+ * - Requires [[compiledSortKeys]] to be initialized; called after the
first tuple.
+ * - For unsupported types, [[compareTypedNonNullValues]] throws
IllegalStateException.
+ */
+ private def compareBySortKeys(left: Tuple, right: Tuple): Int = {
+ var keyIndex = 0
+ while (keyIndex < compiledSortKeys.length) {
+ val currentKey = compiledSortKeys(keyIndex)
+ val leftValue = left.getField[Any](currentKey.index)
+ val rightValue = right.getField[Any](currentKey.index)
+
+ // Null policy: ALWAYS last, regardless of ASC/DESC
+ if (leftValue == null || rightValue == null) {
+ if (leftValue == null && rightValue == null) {
+ keyIndex += 1
+ } else {
+ return if (leftValue == null) 1 else -1
+ }
+ } else {
+ val base = compareTypedNonNullValues(leftValue, rightValue,
currentKey.attributeType)
+ if (base != 0) return if (currentKey.descending) -base else base
+ keyIndex += 1
+ }
+ }
+ 0
+ }
+
+ /**
+ * Compare two non-null values using their attribute type.
+ *
+ * For DOUBLE:
+ * - Uses java.lang.Double.compare (orders -Inf < ... < +Inf < NaN).
+ * - Callers if desired should define how NaN interacts with ASC/DESC and
null policy.
+ */
+ private def compareTypedNonNullValues(
+ leftValue: Any,
+ rightValue: Any,
+ attrType: AttributeType
+ ): Int =
+ attrType match {
+ case AttributeType.INTEGER =>
+ java.lang.Integer.compare(
+ leftValue.asInstanceOf[Number].intValue(),
+ rightValue.asInstanceOf[Number].intValue()
+ )
+ case AttributeType.LONG =>
+ java.lang.Long.compare(
+ leftValue.asInstanceOf[Number].longValue(),
+ rightValue.asInstanceOf[Number].longValue()
+ )
+ case AttributeType.DOUBLE =>
+ java.lang.Double.compare(
+ leftValue.asInstanceOf[Number].doubleValue(),
+ rightValue.asInstanceOf[Number].doubleValue()
+ )
+ case AttributeType.BOOLEAN =>
+ java.lang.Boolean.compare(leftValue.asInstanceOf[Boolean],
rightValue.asInstanceOf[Boolean])
+ case AttributeType.TIMESTAMP =>
+ leftValue
+ .asInstanceOf[java.sql.Timestamp]
+ .compareTo(rightValue.asInstanceOf[java.sql.Timestamp])
+ case AttributeType.STRING =>
+
leftValue.asInstanceOf[String].compareTo(rightValue.asInstanceOf[String])
+ case other =>
+ throw new IllegalStateException(s"Unsupported attribute type $other in
StableMergeSort")
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
new file mode 100644
index 0000000000..9eead41cf1
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import org.apache.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
+import org.apache.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.sql.Timestamp
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.jdk.CollectionConverters.IterableHasAsJava
+
+/**
+ * Integration and internal-behavior tests for [[StableMergeSortOpExec]].
+ *
+ * Scope & coverage:
+ * - Single-key semantics across core types (BOOLEAN, INTEGER/LONG, DOUBLE,
STRING, TIMESTAMP).
+ * - Multi-key lexicographic ordering (mixed directions/types) with null/NaN
handling.
+ * - Stability guarantees (relative order preserved for equal keys) and
pass-through when no keys.
+ * - Incremental “bucket stack” invariants (binary-carry sizes; no adjacent
equal sizes).
+ * - Operational properties (buffering behavior, idempotent onFinish, scale
sanity).
+ * - Test hooks for internal merge logic (mergeSortedBuckets,
pushBucketAndCombine).
+ *
+ * Null policy:
+ * - Nulls are always ordered last, regardless of ASC/DESC.
+ * - NaN participates as a non-null floating value per Double comparison
semantics.
+ *
+ * Notes:
+ * - Some tests rely on package-visible test hooks to validate internals
deterministically.
+ */
+class StableMergeSortOpExecSpec extends AnyFlatSpec {
+
+ //
===========================================================================
+ // Helpers
+ //
===========================================================================
+
+ /** Build a Schema with (name, type) pairs, in-order. */
+ private def schemaOf(attributes: (String, AttributeType)*): Schema = {
+ attributes.foldLeft(Schema()) {
+ case (acc, (name, attrType)) => acc.add(new Attribute(name, attrType))
+ }
+ }
+
+ /**
+ * Construct a Tuple for the provided schema.
+ *
+ * @param values map-like varargs: "colName" -> value. Must provide every
column.
+ * @throws NoSuchElementException if a provided key is not in the schema.
+ */
+ private def tupleOf(schema: Schema, values: (String, Any)*): Tuple = {
+ val valueMap = values.toMap
+ val builder = Tuple.builder(schema)
+ schema.getAttributeNames.asJava.forEach { name =>
+ builder.add(schema.getAttribute(name), valueMap(name))
+ }
+ builder.build()
+ }
+
+ /** Convenience builder for a single sort key with direction (ASC by
default). */
+ private def sortKey(
+ attribute: String,
+ pref: SortPreference = SortPreference.ASC
+ ): SortCriteriaUnit = {
+ val criteria = new SortCriteriaUnit()
+ criteria.attributeName = attribute
+ criteria.sortPreference = pref
+ criteria
+ }
+
+ /** Convert varargs keys into the operator config buffer. */
+ private def sortKeysBuffer(keys: SortCriteriaUnit*):
ListBuffer[SortCriteriaUnit] =
+ ListBuffer(keys: _*)
+
+ /**
+ * Run the operator on an in-memory sequence of tuples and capture all
output.
+ * Output is only emitted at onFinish to preserve determinism.
+ */
+ private def runStableMergeSort(
+ schema: Schema,
+ tuples: Seq[Tuple]
+ )(configure: StableMergeSortOpDesc => Unit): List[Tuple] = {
+ val desc = new StableMergeSortOpDesc()
+ configure(desc)
+ val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+ exec.open()
+ tuples.foreach(t => exec.processTuple(t, 0))
+ val result = exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList
+ exec.close()
+ result
+ }
+
+ /** Internal test hook to read the current bucket sizes on the stack. */
+ private def getBucketSizes(exec: StableMergeSortOpExec): List[Int] =
exec.debugBucketSizes
+
+ /** Decompose an integer into its set-bit powers of two (ascending).
+ * Used to check the binary-carry invariant.
+ */
+ private def binaryDecomposition(number: Int): List[Int] = {
+ var remaining = number
+ val powers = scala.collection.mutable.ListBuffer[Int]()
+ while (remaining > 0) {
+ val lowestSetBit = Integer.lowestOneBit(remaining)
+ powers += lowestSetBit
+ remaining -= lowestSetBit
+ }
+ powers.toList
+ }
+
+ //
===========================================================================
+ // A. Single-key semantics
+ //
===========================================================================
+
+ "StableMergeSortOpExec" should "sort integers ascending and preserve
duplicate order" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER, "label" ->
AttributeType.STRING)
+ val tuples = List(
+ tupleOf(schema, "value" -> 3, "label" -> "a"),
+ tupleOf(schema, "value" -> 1, "label" -> "first-1"),
+ tupleOf(schema, "value" -> 2, "label" -> "b"),
+ tupleOf(schema, "value" -> 1, "label" -> "first-2"),
+ tupleOf(schema, "value" -> 3, "label" -> "c")
+ )
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("value")) }
+ assert(result.map(_.getField[Int]("value")) == List(1, 1, 2, 3, 3))
+ val labelsForOnes =
+ result.filter(_.getField[Int]("value") ==
1).map(_.getField[String]("label"))
+ assert(labelsForOnes == List("first-1", "first-2"))
+ }
+
+ it should "sort integers descending while preserving stability" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER, "label" ->
AttributeType.STRING)
+ val tuples = List(
+ tupleOf(schema, "value" -> 2, "label" -> "first"),
+ tupleOf(schema, "value" -> 2, "label" -> "second"),
+ tupleOf(schema, "value" -> 1, "label" -> "third"),
+ tupleOf(schema, "value" -> 3, "label" -> "fourth")
+ )
+ val result = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("value", SortPreference.DESC))
+ }
+ assert(result.map(_.getField[Int]("value")) == List(3, 2, 2, 1))
+ val labelsForTwos =
+ result.filter(_.getField[Int]("value") ==
2).map(_.getField[String]("label"))
+ assert(labelsForTwos == List("first", "second"))
+ }
+
+ it should "handle string ordering (case-sensitive)" in {
+ val schema = schemaOf("name" -> AttributeType.STRING)
+ val tuples = List(
+ tupleOf(schema, "name" -> "apple"),
+ tupleOf(schema, "name" -> "Banana"),
+ tupleOf(schema, "name" -> "banana"),
+ tupleOf(schema, "name" -> "APPLE")
+ )
+ val sorted = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("name", SortPreference.ASC))
+ }
+ assert(sorted.map(_.getField[String]("name")) == List("APPLE", "Banana",
"apple", "banana"))
+ }
+
+ it should "order ASCII strings by Java compareTo (punctuation < digits <
uppercase < lowercase)" in {
+ val schema = schemaOf("str" -> AttributeType.STRING)
+ val tuples = List("a", "A", "0", "~", "!").map(s => tupleOf(schema, "str"
-> s))
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("str")) }
+ assert(result.map(_.getField[String]("str")) == List("!", "0", "A", "a",
"~"))
+ }
+
+ it should "sort negatives and zeros correctly" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val tuples = List(0, -1, -10, 5, -3, 2).map(v => tupleOf(schema, "value"
-> v))
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("value")) }
+ assert(result.map(_.getField[Int]("value")) == List(-10, -3, -1, 0, 2, 5))
+ }
+
+ it should "sort LONG values ascending" in {
+ val schema = schemaOf("id" -> AttributeType.LONG)
+ val tuples = List(5L, 1L, 3L, 9L, 0L).map(v => tupleOf(schema, "id" -> v))
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("id")) }
+ assert(result.map(_.getField[Long]("id")) == List(0L, 1L, 3L, 5L, 9L))
+ }
+
+ it should "sort TIMESTAMP ascending" in {
+ val schema = schemaOf("timestamp" -> AttributeType.TIMESTAMP)
+ val base = Timestamp.valueOf("2022-01-01 00:00:00")
+ val tuples = List(
+ new Timestamp(base.getTime + 4000),
+ new Timestamp(base.getTime + 1000),
+ new Timestamp(base.getTime + 3000),
+ new Timestamp(base.getTime + 2000)
+ ).map(ts => tupleOf(schema, "timestamp" -> ts))
+ val result = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("timestamp", SortPreference.ASC))
+ }
+ val times = result.map(_.getField[Timestamp]("timestamp").getTime)
+ assert(times == times.sorted)
+ }
+
+ it should "sort TIMESTAMP descending" in {
+ val schema = schemaOf("timestamp" -> AttributeType.TIMESTAMP)
+ val base = Timestamp.valueOf("2023-01-01 00:00:00")
+ val tuples = List(
+ new Timestamp(base.getTime + 3000),
+ base,
+ new Timestamp(base.getTime + 1000),
+ new Timestamp(base.getTime + 2000)
+ ).map(ts => tupleOf(schema, "timestamp" -> ts))
+ val result = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("timestamp", SortPreference.DESC))
+ }
+ val times = result.map(_.getField[Timestamp]("timestamp").getTime)
+ assert(times == times.sorted(Ordering.Long.reverse))
+ }
+
+ it should "treat numeric strings as strings (lexicographic ordering)" in {
+ val schema = schemaOf("str" -> AttributeType.STRING)
+ val tuples = List("2", "10", "1", "11", "20").map(s => tupleOf(schema,
"str" -> s))
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("str")) }
+ assert(result.map(_.getField[String]("str")) == List("1", "10", "11", "2",
"20"))
+ }
+
+ it should "sort BOOLEAN ascending (false < true) and descending" in {
+ val schema = schemaOf("bool" -> AttributeType.BOOLEAN)
+ val tuples = List(true, false, true, false).map(v => tupleOf(schema,
"bool" -> v))
+ val asc = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("bool", SortPreference.ASC))
+ }
+ assert(asc.map(_.getField[Boolean]("bool")) == List(false, false, true,
true))
+ val desc = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("bool", SortPreference.DESC))
+ }
+ assert(desc.map(_.getField[Boolean]("bool")) == List(true, true, false,
false))
+ }
+
+ //
===========================================================================
+ // B. Floating-point & Null/NaN policy
+ //
===========================================================================
+
+ it should "sort DOUBLE values including -0.0, 0.0, infinities and NaN" in {
+ val schema = schemaOf("x" -> AttributeType.DOUBLE)
+ val tuples =
+ List(Double.NaN, Double.PositiveInfinity, 1.5, -0.0, 0.0, -3.2,
Double.NegativeInfinity)
+ .map(v => tupleOf(schema, "x" -> v))
+ val result = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("x"))
+ }
+ val values = result.map(_.getField[Double]("x"))
+ assert(values.head == Double.NegativeInfinity)
+ assert(values(1) == -3.2)
+ assert(java.lang.Double.compare(values(2), -0.0) == 0)
+ assert(java.lang.Double.compare(values(3), 0.0) == 0)
+ assert(values(4) == 1.5)
+ assert(values(5) == Double.PositiveInfinity)
+ assert(java.lang.Double.isNaN(values(6)))
+ }
+
+ it should "place NaN before null when sorting DOUBLE ascending (nulls last
policy)" in {
+ val schema = schemaOf("x" -> AttributeType.DOUBLE)
+ val tuples = List(
+ tupleOf(schema, "x" -> null),
+ tupleOf(schema, "x" -> Double.NaN),
+ tupleOf(schema, "x" -> Double.NegativeInfinity),
+ tupleOf(schema, "x" -> 1.0),
+ tupleOf(schema, "x" -> Double.PositiveInfinity),
+ tupleOf(schema, "x" -> null)
+ )
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("x")) }
+ val values = result.map(_.getField[java.lang.Double]("x"))
+ assert(values.take(4).forall(_ != null)) // first 4 are non-null
+ assert(values(0).isInfinite && values(0) == Double.NegativeInfinity)
+ assert(values(1) == 1.0)
+ assert(values(2).isInfinite && values(2) == Double.PositiveInfinity)
+ assert(java.lang.Double.isNaN(values(3)))
+ assert(values.drop(4).forall(_ == null))
+ }
+
+ it should "place nulls last regardless of ascending or descending" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER, "label" ->
AttributeType.STRING)
+ val tuples = List(
+ tupleOf(schema, "value" -> null, "label" -> "null-1"),
+ tupleOf(schema, "value" -> 5, "label" -> "five"),
+ tupleOf(schema, "value" -> null, "label" -> "null-2"),
+ tupleOf(schema, "value" -> 3, "label" -> "three")
+ )
+ val asc = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("value", SortPreference.ASC))
+ }
+ assert(asc.map(_.getField[String]("label")) == List("three", "five",
"null-1", "null-2"))
+
+ val desc = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("value", SortPreference.DESC))
+ }
+ assert(desc.map(_.getField[String]("label")) == List("five", "three",
"null-1", "null-2"))
+ }
+
+ it should "order NaN highest on secondary DESC but still place nulls last"
in {
+ val schema = schemaOf(
+ "group" -> AttributeType.STRING,
+ "score" -> AttributeType.DOUBLE,
+ "label" -> AttributeType.STRING
+ )
+ val tuples = List(
+ tupleOf(schema, "group" -> "A", "score" -> java.lang.Double.NaN, "label"
-> "nan"),
+ tupleOf(schema, "group" -> "A", "score" -> Double.PositiveInfinity,
"label" -> "pinf"),
+ tupleOf(schema, "group" -> "A", "score" -> 1.0, "label" -> "one"),
+ tupleOf(schema, "group" -> "A", "score" -> 0.0, "label" -> "zero"),
+ tupleOf(schema, "group" -> "A", "score" -> -1.0, "label" -> "neg"),
+ tupleOf(schema, "group" -> "A", "score" -> Double.NegativeInfinity,
"label" -> "ninf"),
+ tupleOf(schema, "group" -> "A", "score" -> null, "label" -> "null-1"),
+ tupleOf(schema, "group" -> "A", "score" -> null, "label" -> "null-2")
+ )
+ val result = runStableMergeSort(schema, tuples) { desc =>
+ desc.keys =
+ sortKeysBuffer(sortKey("group", SortPreference.ASC), sortKey("score",
SortPreference.DESC))
+ }
+ assert(
+ result.map(_.getField[String]("label")) ==
+ List("nan", "pinf", "one", "zero", "neg", "ninf", "null-1", "null-2")
+ )
+ }
+
+ //
===========================================================================
+ // C. Multi-key semantics (lexicographic)
+ //
===========================================================================
+
+ it should "support multi-key sorting with mixed attribute types" in {
+ val schema = schemaOf(
+ "dept" -> AttributeType.STRING,
+ "score" -> AttributeType.DOUBLE,
+ "name" -> AttributeType.STRING,
+ "hired" -> AttributeType.TIMESTAMP
+ )
+ val base = new Timestamp(Timestamp.valueOf("2020-01-01 00:00:00").getTime)
+ val tuples = List(
+ tupleOf(schema, "dept" -> "Sales", "score" -> 9.5, "name" -> "Alice",
"hired" -> base),
+ tupleOf(
+ schema,
+ "dept" -> "Sales",
+ "score" -> 9.5,
+ "name" -> "Bob",
+ "hired" -> new Timestamp(base.getTime + 1000)
+ ),
+ tupleOf(
+ schema,
+ "dept" -> "Sales",
+ "score" -> 8.0,
+ "name" -> "Carol",
+ "hired" -> new Timestamp(base.getTime + 2000)
+ ),
+ tupleOf(
+ schema,
+ "dept" -> "Engineering",
+ "score" -> 9.5,
+ "name" -> "Dave",
+ "hired" -> new Timestamp(base.getTime + 3000)
+ ),
+ tupleOf(
+ schema,
+ "dept" -> null,
+ "score" -> 9.5,
+ "name" -> "Eve",
+ "hired" -> new Timestamp(base.getTime + 4000)
+ )
+ )
+ val result = runStableMergeSort(schema, tuples) { desc =>
+ desc.keys = sortKeysBuffer(
+ sortKey("dept", SortPreference.ASC),
+ sortKey("score", SortPreference.DESC),
+ sortKey("name", SortPreference.ASC)
+ )
+ }
+ assert(result.map(_.getField[String]("name")) == List("Dave", "Alice",
"Bob", "Carol", "Eve"))
+ }
+
+ it should "handle multi-key with descending primary and ascending secondary"
in {
+ val schema = schemaOf(
+ "major" -> AttributeType.INTEGER,
+ "minor" -> AttributeType.INTEGER,
+ "idx" -> AttributeType.INTEGER
+ )
+ val tuples = List(
+ (1, 9, 0),
+ (1, 1, 1),
+ (2, 5, 2),
+ (2, 3, 3),
+ (1, 1, 4),
+ (3, 0, 5),
+ (3, 2, 6)
+ ).map { case (ma, mi, i) => tupleOf(schema, "major" -> ma, "minor" -> mi,
"idx" -> i) }
+ val result = runStableMergeSort(schema, tuples) { desc =>
+ desc.keys =
+ sortKeysBuffer(sortKey("major", SortPreference.DESC), sortKey("minor",
SortPreference.ASC))
+ }
+ val pairs = result.map(t => (t.getField[Int]("major"),
t.getField[Int]("minor")))
+ assert(pairs == List((3, 0), (3, 2), (2, 3), (2, 5), (1, 1), (1, 1), (1,
9)))
+ val idxFor11 = result
+ .filter(t => t.getField[Int]("major") == 1 && t.getField[Int]("minor")
== 1)
+ .map(_.getField[Int]("idx"))
+ assert(idxFor11 == List(1, 4))
+ }
+
+ it should "use the third key as a tiebreaker (ASC, ASC, then DESC)" in {
+ val schema = schemaOf(
+ "keyA" -> AttributeType.INTEGER,
+ "keyB" -> AttributeType.INTEGER,
+ "keyC" -> AttributeType.INTEGER,
+ "id" -> AttributeType.STRING
+ )
+ val tuples = List(
+ (1, 1, 1, "x1"),
+ (1, 1, 3, "x3"),
+ (1, 1, 2, "x2"),
+ (1, 0, 9, "y9")
+ ).map {
+ case (a, b, c, id) => tupleOf(schema, "keyA" -> a, "keyB" -> b, "keyC"
-> c, "id" -> id)
+ }
+ val result = runStableMergeSort(schema, tuples) {
+ _.keys =
+ sortKeysBuffer(sortKey("keyA"), sortKey("keyB"), sortKey("keyC",
SortPreference.DESC))
+ }
+ assert(result.map(_.getField[String]("id")) == List("y9", "x3", "x2",
"x1"))
+ }
+
+ it should "place nulls last across multiple keys (primary ASC, secondary
DESC)" in {
+ val schema = schemaOf("keyA" -> AttributeType.STRING, "keyB" ->
AttributeType.INTEGER)
+ val tuples = List(
+ ("x", 2),
+ (null, 1),
+ ("x", 1),
+ (null, 5),
+ ("a", 9),
+ ("a", 2)
+ ).map { case (s, i) => tupleOf(schema, "keyA" -> s, "keyB" -> i) }
+ val result = runStableMergeSort(schema, tuples) { desc =>
+ desc.keys =
+ sortKeysBuffer(sortKey("keyA", SortPreference.ASC), sortKey("keyB",
SortPreference.DESC))
+ }
+ val out = result.map(t => (t.getField[String]("keyA"),
t.getField[Int]("keyB")))
+ assert(out == List(("a", 9), ("a", 2), ("x", 2), ("x", 1), (null, 5),
(null, 1)))
+ }
+
+ it should "when primary keys are both null, fall back to secondary ASC
(nulls still after non-nulls)" in {
+ val schema = schemaOf(
+ "keyA" -> AttributeType.STRING,
+ "keyB" -> AttributeType.INTEGER,
+ "id" -> AttributeType.STRING
+ )
+ val tuples = List(
+ tupleOf(schema, "keyA" -> "A", "keyB" -> 2, "id" -> "non-null-a"),
+ tupleOf(schema, "keyA" -> null, "keyB" -> 5, "id" -> "null-a-5"),
+ tupleOf(schema, "keyA" -> null, "keyB" -> 1, "id" -> "null-a-1"),
+ tupleOf(schema, "keyA" -> "B", "keyB" -> 9, "id" -> "non-null-b")
+ )
+ val result = runStableMergeSort(schema, tuples) {
+ _.keys = sortKeysBuffer(sortKey("keyA"), sortKey("keyB"))
+ }
+ assert(
+ result
+ .map(_.getField[String]("id")) == List("non-null-a", "non-null-b",
"null-a-1", "null-a-5")
+ )
+ }
+
+ //
===========================================================================
+ // D. Stability & operational behaviors
+ //
===========================================================================
+
+ it should "preserve original order among tuples with equal keys" in {
+ val schema = schemaOf("key" -> AttributeType.INTEGER, "index" ->
AttributeType.INTEGER)
+ val tuples = (0 until 100).map(i => tupleOf(schema, "key" -> (i % 5),
"index" -> i))
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("key")) }
+ val grouped = result.groupBy(_.getField[Int]("key")).values
+ grouped.foreach { group =>
+ val indices = group.map(_.getField[Int]("index"))
+ assert(indices == indices.sorted)
+ }
+ }
+
+ it should "act as a stable pass-through when keys are empty" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER, "label" ->
AttributeType.STRING)
+ val tuples = List(3, 1, 4, 1, 5, 9).zipWithIndex
+ .map { case (v, i) => tupleOf(schema, "value" -> v, "label" ->
s"row-$i") }
+ val result = runStableMergeSort(schema, tuples) { desc =>
+ desc.keys = ListBuffer.empty[SortCriteriaUnit]
+ }
+ assert(
+ result.map(_.getField[String]("label")) ==
+ List("row-0", "row-1", "row-2", "row-3", "row-4", "row-5")
+ )
+ }
+
+ it should "buffer tuples until onFinish is called" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val tuple = tupleOf(schema, "value" -> 2)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("value"))
+ val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+ exec.open()
+ val immediate = exec.processTuple(tuple, 0)
+ assert(immediate.isEmpty)
+ val result = exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList
+ assert(result.map(_.getField[Int]("value")) == List(2))
+ exec.close()
+ }
+
+ it should "return empty for empty input" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val result = runStableMergeSort(schema, Seq.empty) { _.keys =
sortKeysBuffer(sortKey("value")) }
+ assert(result.isEmpty)
+ }
+
+ it should "handle single element input" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val result = runStableMergeSort(schema, Seq(tupleOf(schema, "value" ->
42))) {
+ _.keys = sortKeysBuffer(sortKey("value"))
+ }
+ assert(result.map(_.getField[Int]("value")) == List(42))
+ }
+
+ it should "sort large inputs efficiently (sanity on boundaries)" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER, "label" ->
AttributeType.STRING)
+ val tuples = (50000 to 1 by -1).map(i => tupleOf(schema, "value" -> i,
"label" -> s"row-$i"))
+ val result = runStableMergeSort(schema, tuples) { _.keys =
sortKeysBuffer(sortKey("value")) }
+ assert(result.head.getField[Int]("value") == 1)
+ assert(result(1).getField[Int]("value") == 2)
+ assert(result.takeRight(2).map(_.getField[Int]("value")) == List(49999,
50000))
+ }
+
+ //
===========================================================================
+ // E. Incremental bucket invariants (binary-carry & no-adjacent-equal)
+ //
===========================================================================
+
+ it should "merge incrementally: bucket sizes match binary decomposition
after each push" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("value"))
+ val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+ exec.open()
+
+ val totalCount = 64
+ for (index <- (totalCount - 1) to 0 by -1) {
+ exec.processTuple(tupleOf(schema, "value" -> index), 0)
+ val sizes = getBucketSizes(exec).sorted
+ assert(sizes == binaryDecomposition(totalCount - index))
+ }
+
+ exec.close()
+ }
+
+ it should "maintain bucket-stack invariant (no adjacent equal sizes) after
each insertion" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("value"))
+ val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+ exec.open()
+
+ val totalCount = 200
+ val stream = (0 until totalCount by 2) ++ (1 until totalCount by 2)
+ stream.foreach { index =>
+ exec.processTuple(tupleOf(schema, "value" -> (totalCount - 1 - index)),
0)
+ val sizes = getBucketSizes(exec)
+ sizes.sliding(2).foreach { pair =>
+ if (pair.length == 2) assert(pair.head != pair.last)
+ }
+ }
+
+ exec.close()
+ }
+
+ it should "form expected bucket sizes at milestones (1,2,3,4,7,8,15,16)" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("value"))
+ val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+ exec.open()
+
+ val inputSequence = (100 to 1 by -1).map(i => tupleOf(schema, "value" ->
i))
+ val milestones = Set(1, 2, 3, 4, 7, 8, 15, 16)
+ var pushed = 0
+ inputSequence.foreach { t =>
+ exec.processTuple(t, 0); pushed += 1
+ if (milestones.contains(pushed)) {
+ val sizes = getBucketSizes(exec).sorted
+ assert(sizes == binaryDecomposition(pushed))
+ }
+ }
+
+ exec.close()
+ }
+
+ //
===========================================================================
+ // F. Internal hooks — merge behavior
+ //
===========================================================================
+
+ "mergeSortedBuckets" should "be stable: left bucket wins on equal keys" in {
+ val schema = schemaOf("key" -> AttributeType.INTEGER, "id" ->
AttributeType.STRING)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("key"))
+ val exec = new
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+
+ // Seed to resolve schema/keys once.
+ exec.processTuple(tupleOf(schema, "key" -> 0, "id" -> "seed"), 0)
+
+ val left = ArrayBuffer(
+ tupleOf(schema, "key" -> 1, "id" -> "L1"),
+ tupleOf(schema, "key" -> 2, "id" -> "L2")
+ )
+ val right = ArrayBuffer(
+ tupleOf(schema, "key" -> 1, "id" -> "R1"),
+ tupleOf(schema, "key" -> 3, "id" -> "R3")
+ )
+
+ val merged = exec.mergeSortedBuckets(left, right)
+ val ids = merged.map(_.getField[String]("id")).toList
+ assert(ids == List("L1", "R1", "L2", "R3"))
+ exec.close()
+ }
+
+ "mergeSortedBuckets" should "handle empty left bucket" in {
+ val schema = schemaOf("key" -> AttributeType.INTEGER, "id" ->
AttributeType.STRING)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("key"))
+ val exec = new
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+ exec.processTuple(tupleOf(schema, "key" -> 0, "id" -> "seed"), 0) // seed
keys
+
+ val left = ArrayBuffer.empty[Tuple]
+ val right = ArrayBuffer(
+ tupleOf(schema, "key" -> 1, "id" -> "r1"),
+ tupleOf(schema, "key" -> 2, "id" -> "r2")
+ )
+ val merged = exec.mergeSortedBuckets(left, right)
+ assert(merged.map(_.getField[String]("id")).toList == List("r1", "r2"))
+ exec.close()
+ }
+
+ "mergeSortedBuckets" should "handle empty right bucket" in {
+ val schema = schemaOf("key" -> AttributeType.INTEGER, "id" ->
AttributeType.STRING)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("key"))
+ val exec = new
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+ exec.processTuple(tupleOf(schema, "key" -> 0, "id" -> "seed"), 0)
+
+ val left = ArrayBuffer(
+ tupleOf(schema, "key" -> 1, "id" -> "l1"),
+ tupleOf(schema, "key" -> 2, "id" -> "l2")
+ )
+ val right = ArrayBuffer.empty[Tuple]
+ val merged = exec.mergeSortedBuckets(left, right)
+ assert(merged.map(_.getField[String]("id")).toList == List("l1", "l2"))
+ exec.close()
+ }
+
+ //
===========================================================================
+ // G. Internal hooks — push/finish/idempotence & schema errors
+ //
===========================================================================
+
+ "pushBucketAndCombine" should "merge two size-2 buckets into size-4 on push
(with existing size-1 seed)" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("value"))
+ val exec = new
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+
+ // seed to compile keys -> results in one size-1 bucket in the stack
+ exec.processTuple(tupleOf(schema, "value" -> 0), 0)
+
+ // two pre-sorted buckets of size 2
+ val bucket1 = ArrayBuffer(tupleOf(schema, "value" -> 1), tupleOf(schema,
"value" -> 3))
+ val bucket2 = ArrayBuffer(tupleOf(schema, "value" -> 2), tupleOf(schema,
"value" -> 4))
+
+ exec.pushBucketAndCombine(bucket1) // sizes now [1,2]
+ exec.pushBucketAndCombine(bucket2) // equal top [2,2] => merged to 4;
sizes [1,4]
+
+ val sizes = getBucketSizes(exec)
+ assert(sizes == List(1, 4))
+ exec.close()
+ }
+
+ it should "return the same sorted output if onFinish is called twice in a
row" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("value"))
+ val exec = new
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+ List(3, 1, 2).foreach(i => exec.processTuple(tupleOf(schema, "value" ->
i), 0))
+
+ val first =
exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList.map(_.getField[Int]("value"))
+ val second =
exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList.map(_.getField[Int]("value"))
+ assert(first == List(1, 2, 3))
+ assert(second == List(1, 2, 3))
+ exec.close()
+ }
+
+ it should "have processTuple always return empty iterators until finish" in {
+ val schema = schemaOf("value" -> AttributeType.INTEGER)
+ val desc = new StableMergeSortOpDesc(); desc.keys =
sortKeysBuffer(sortKey("value"))
+ val exec = new
StableMergeSortOpExec(objectMapper.writeValueAsString(desc)); exec.open()
+ val immediates = (10 to 1 by -1).map(i =>
exec.processTuple(tupleOf(schema, "value" -> i), 0))
+ assert(immediates.forall(_.isEmpty))
+ val out =
exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList.map(_.getField[Int]("value"))
+ assert(out == (1 to 10).toList)
+ exec.close()
+ }
+
+}
diff --git a/frontend/src/assets/operator_images/StableMergeSort.png
b/frontend/src/assets/operator_images/StableMergeSort.png
new file mode 100644
index 0000000000..bf9ec5632d
Binary files /dev/null and
b/frontend/src/assets/operator_images/StableMergeSort.png differ