Yicong-Huang commented on code in PR #3774:
URL: https://github.com/apache/texera/pull/3774#discussion_r2438070232


##########
common/workflow-operator/src/main/scala/org/apache/amber/operator/sort/StableMergeSortOpExec.scala:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import org.apache.amber.core.executor.OperatorExecutor
+import org.apache.amber.core.tuple.{AttributeType, Schema, Tuple, TupleLike}
+import org.apache.amber.util.JSONUtils.objectMapper
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Stable in-memory merge sort for a single input partition.
+ *
+ * Strategy:
+ *  - Buffer incoming tuples as size-1 sorted buckets.
+ *  - Maintain a stack of buckets where adjacent buckets never share the same 
length.
+ *  - On each push, perform "binary-carry" merges while the top two buckets 
have equal sizes.
+ *  - At finish, collapse the stack left-to-right. Merging is stable (left 
wins on ties).
+ *
+ * Null policy:
+ *  - Nulls are always ordered last, regardless of ascending/descending per 
key.
+ */
+class StableMergeSortOpExec(descString: String) extends OperatorExecutor {
+
+  private val desc: StableMergeSortOpDesc =
+    objectMapper.readValue(descString, classOf[StableMergeSortOpDesc])
+
+  private var inputSchema: Schema = _
+
+  /** Sort key resolved against the schema (index, data type, and direction). 
*/
+  private case class CompiledSortKey(
+                                      index: Int,
+                                      attributeType: AttributeType,
+                                      descending: Boolean
+                                    )
+
+  /** Lexicographic sort keys compiled once on first tuple. */
+  private var compiledSortKeys: Array[CompiledSortKey] = _
+
+  /** Stack of sorted buckets. Invariant: no two adjacent buckets have equal 
lengths. */
+  private var sortedBuckets: ArrayBuffer[ArrayBuffer[Tuple]] = _
+
+  /** Exposed for testing: current bucket sizes from bottom to top of the 
stack. */
+  private[sort] def debugBucketSizes: List[Int] =
+    if (sortedBuckets == null) Nil else sortedBuckets.filter(_ != 
null).map(_.size).toList
+
+  /** Initialize internal state. */
+  override def open(): Unit = {
+    sortedBuckets = ArrayBuffer.empty[ArrayBuffer[Tuple]]
+  }
+
+  /** Release internal buffers. */
+  override def close(): Unit = {
+    if (sortedBuckets != null) sortedBuckets.clear()
+  }
+
+  /**
+   * Ingest a tuple. Defers emission until onFinish.
+   *
+   * Schema compilation happens on the first tuple.
+   * Each tuple forms a size-1 sorted bucket that is pushed and possibly 
merged.
+   */
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+    if (inputSchema == null) {
+      inputSchema = tuple.getSchema
+      compiledSortKeys = compileSortKeys(inputSchema)
+    }
+    val sizeOneBucket = ArrayBuffer[Tuple](tuple)
+    pushBucketAndCombine(sizeOneBucket)
+    Iterator.empty
+  }
+
+  /**
+   * Emit all sorted tuples by collapsing the bucket stack left-to-right.
+   * Stability is preserved because merge prefers the left bucket on equality.
+   */
+  override def onFinish(port: Int): Iterator[TupleLike] = {
+    if (sortedBuckets.isEmpty) return Iterator.empty
+
+    var accumulator = sortedBuckets(0)
+    var idx = 1
+    while (idx < sortedBuckets.length) {
+      accumulator = mergeSortedBuckets(accumulator, sortedBuckets(idx))
+      idx += 1
+    }
+
+    sortedBuckets.clear()
+    sortedBuckets.append(accumulator)
+    accumulator.iterator
+  }
+
+  /**
+   * Resolve logical sort keys to schema indices and attribute types.
+   * Outputs an array of compiled sort keys used by [[compareBySortKeys]].
+   */
+  private def compileSortKeys(schema: Schema): Array[CompiledSortKey] = {
+    desc.keys.map { k: SortCriteriaUnit =>
+      val name = k.attributeName

Review Comment:
   still have `k` name



##########
common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala:
##########
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.sort
+
+import org.apache.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
+import org.apache.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.sql.Timestamp
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.jdk.CollectionConverters.IterableHasAsJava
+
+/**
+ * Integration and internal-behavior tests for [[StableMergeSortOpExec]].
+ *
+ * Scope & coverage:
+ *  - Single-key semantics across core types (BOOLEAN, INTEGER/LONG, DOUBLE, 
STRING, TIMESTAMP).
+ *  - Multi-key lexicographic ordering (mixed directions/types) with null/NaN 
handling.
+ *  - Stability guarantees (relative order preserved for equal keys) and 
pass-through when no keys.
+ *  - Incremental “bucket stack” invariants (binary-carry sizes; no adjacent 
equal sizes).
+ *  - Operational properties (buffering behavior, idempotent onFinish, scale 
sanity).
+ *  - Test hooks for internal merge logic (mergeSortedBuckets, 
pushBucketAndCombine).
+ *
+ * Null policy:
+ *  - Nulls are always ordered last, regardless of ASC/DESC.
+ *  - NaN participates as a non-null floating value per Double comparison 
semantics.
+ *
+ * Notes:
+ *  - Some tests rely on package-visible test hooks to validate internals 
deterministically.
+ */
+class StableMergeSortOpExecSpec extends AnyFlatSpec {
+
+  // 
===========================================================================
+  // Helpers
+  // 
===========================================================================
+
+  /** Build a Schema with (name, type) pairs, in-order. */
+  private def schemaOf(attributes: (String, AttributeType)*): Schema = {
+    attributes.foldLeft(Schema()) { case (acc, (name, tpe)) => acc.add(new 
Attribute(name, tpe)) }
+  }
+
+  /**
+   * Construct a Tuple for the provided schema.
+   *
+   * @param values map-like varargs: "colName" -> value. Must provide every 
column.
+   * @throws NoSuchElementException if a provided key is not in the schema.
+   */
+  private def tupleOf(schema: Schema, values: (String, Any)*): Tuple = {
+    val valueMap = values.toMap
+    val builder  = Tuple.builder(schema)
+    schema.getAttributeNames.asJava.forEach { name =>
+      builder.add(schema.getAttribute(name), valueMap(name))
+    }
+    builder.build()
+  }
+
+  /** Convenience builder for a single sort key with direction (ASC by 
default). */
+  private def sortKey(attribute: String, pref: SortPreference = 
SortPreference.ASC): SortCriteriaUnit = {
+    val k = new SortCriteriaUnit()
+    k.attributeName  = attribute
+    k.sortPreference = pref
+    k
+  }
+
+  /** Convert varargs keys into the operator config buffer. */
+  private def sortKeysBuffer(ks: SortCriteriaUnit*): 
ListBuffer[SortCriteriaUnit] =
+    ListBuffer(ks: _*)
+
+  /**
+   * Run the operator on an in-memory sequence of tuples and capture all 
output.
+   * Output is only emitted at onFinish to preserve determinism.
+   */
+  private def runStableMergeSort(
+                                  schema:  Schema,
+                                  tuples:  Seq[Tuple]
+                                )(configure: StableMergeSortOpDesc => Unit): 
List[Tuple] = {
+    val desc = new StableMergeSortOpDesc()
+    configure(desc)
+    val exec = new StableMergeSortOpExec(objectMapper.writeValueAsString(desc))
+    exec.open()
+    tuples.foreach(t => exec.processTuple(t, 0))
+    val result = exec.onFinish(0).map(_.asInstanceOf[Tuple]).toList
+    exec.close()
+    result
+  }
+
+  /** Internal test hook to read the current bucket sizes on the stack. */
+  private def getBucketSizes(exec: StableMergeSortOpExec): List[Int] = 
exec.debugBucketSizes
+
+  /** Decompose an integer into its set bit powers-of-two (sorted); used to 
check the binary-carry invariant. */
+  private def binaryDecomposition(n: Int): List[Int] = {
+    var k = n
+    val out = scala.collection.mutable.ListBuffer[Int]()
+    while (k > 0) {
+      val p = Integer.lowestOneBit(k)
+      out += p
+      k -= p

Review Comment:
   avoid single char names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to