This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9fc5b39d96 test(amber): add unit test coverage for NetworkOutputBuffer
(#4958)
9fc5b39d96 is described below
commit 9fc5b39d96ced08576732d78eab67c0d4645f4d0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed May 6 16:06:24 2026 -0700
test(amber): add unit test coverage for NetworkOutputBuffer (#4958)
---
.../partitioners/NetworkOutputBufferSpec.scala | 274 +++++++++++++++++++++
1 file changed, 274 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/NetworkOutputBufferSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/NetworkOutputBufferSpec.scala
new file mode 100644
index 0000000000..765dd386b8
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/NetworkOutputBufferSpec.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.sendsemantics.partitioners
+
+import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.common.ambermessage.{
+ DataFrame,
+ StateFrame,
+ WorkflowFIFOMessage
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable.ArrayBuffer
+
+class NetworkOutputBufferSpec extends AnyFlatSpec {
+
+ // --- fixtures
--------------------------------------------------------------
+
+ private val sender = ActorVirtualIdentity("sender")
+ private val receiver = ActorVirtualIdentity("receiver-1")
+
+ private val intAttr = new Attribute("v", AttributeType.INTEGER)
+ private val schema: Schema = Schema().add(intAttr)
+ private def tuple(value: Int): Tuple =
+ Tuple.builder(schema).add(intAttr, value).build()
+
+ /** Recording wrapper around a real `NetworkOutputGateway`. */
+ private class Capture {
+ val messages: ArrayBuffer[WorkflowFIFOMessage] = ArrayBuffer.empty
+ val gateway: NetworkOutputGateway =
+ new NetworkOutputGateway(sender, m => messages += m)
+ }
+
+ private def newBuffer(batchSize: Int = 4): (NetworkOutputBuffer, Capture) = {
+ val cap = new Capture
+ val buf = new NetworkOutputBuffer(receiver, cap.gateway, batchSize =
batchSize)
+ (buf, cap)
+ }
+
+ // --- construction defaults
-------------------------------------------------
+
+ "NetworkOutputBuffer" should "default batchSize to
ApplicationConfig.defaultDataTransferBatchSize" in {
+ val cap = new Capture
+ val buf = new NetworkOutputBuffer(receiver, cap.gateway)
+ assert(buf.batchSize == ApplicationConfig.defaultDataTransferBatchSize)
+ }
+
+ it should "expose `to` and `dataOutputPort` as immutable accessors" in {
+ val cap = new Capture
+ val buf = new NetworkOutputBuffer(receiver, cap.gateway, batchSize = 4)
+ assert(buf.to == receiver)
+ assert(buf.dataOutputPort eq cap.gateway)
+ }
+
+ it should "start with an empty buffer (no implicit auto-flush at
construction)" in {
+ val (_, cap) = newBuffer()
+ assert(cap.messages.isEmpty)
+ }
+
+ // --- addTuple buffering / auto-flush --------------------------------------
+
+ "NetworkOutputBuffer.addTuple" should "NOT flush while the buffer is below
batchSize" in {
+ val (buf, cap) = newBuffer(batchSize = 4)
+ buf.addTuple(tuple(0))
+ buf.addTuple(tuple(1))
+ buf.addTuple(tuple(2))
+ assert(cap.messages.isEmpty, "no DataFrame should be sent until batchSize
is reached")
+ }
+
+ it should "auto-flush when the buffer exactly reaches batchSize" in {
+ val (buf, cap) = newBuffer(batchSize = 3)
+ buf.addTuple(tuple(0))
+ buf.addTuple(tuple(1))
+ buf.addTuple(tuple(2)) // boundary: now size == batchSize
+ assert(cap.messages.size == 1, "exactly one DataFrame should be
auto-flushed at the boundary")
+ val frame = cap.messages.head.payload.asInstanceOf[DataFrame]
+ assert(frame.frame.toList == List(tuple(0), tuple(1), tuple(2)))
+ }
+
+ it should "produce a separate DataFrame for each successive batch" in {
+ val (buf, cap) = newBuffer(batchSize = 2)
+ (0 until 6).foreach(i => buf.addTuple(tuple(i)))
+ assert(cap.messages.size == 3, "three full batches → three DataFrames")
+ val payloads =
cap.messages.map(_.payload.asInstanceOf[DataFrame].frame.toList)
+ assert(payloads.head == List(tuple(0), tuple(1)))
+ assert(payloads(1) == List(tuple(2), tuple(3)))
+ assert(payloads(2) == List(tuple(4), tuple(5)))
+ }
+
+ it should "send DataFrames to the configured receiver only" in {
+ val (buf, cap) = newBuffer(batchSize = 2)
+ buf.addTuple(tuple(0))
+ buf.addTuple(tuple(1))
+ assert(cap.messages.size == 1)
+ val msg = cap.messages.head
+ assert(msg.channelId.fromWorkerId == sender)
+ assert(msg.channelId.toWorkerId == receiver)
+ assert(!msg.channelId.isControl, "data path must not use the control
channel")
+ }
+
+ // --- flush()
----------------------------------------------------------------
+
+ "NetworkOutputBuffer.flush" should "send a DataFrame and reset the buffer
when the buffer is non-empty" in {
+ val (buf, cap) = newBuffer(batchSize = 100) // never auto-flushes
+ buf.addTuple(tuple(7))
+ buf.addTuple(tuple(8))
+ buf.flush()
+ assert(cap.messages.size == 1)
+ val frame = cap.messages.head.payload.asInstanceOf[DataFrame]
+ assert(frame.frame.toList == List(tuple(7), tuple(8)))
+ // A second flush() with nothing buffered must not send another frame.
+ buf.flush()
+ assert(cap.messages.size == 1, "flush() on an empty buffer must be a
no-op")
+ }
+
+ it should "be a no-op when called on an empty buffer (no DataFrame, no
StateFrame)" in {
+ val (buf, cap) = newBuffer()
+ buf.flush()
+ buf.flush()
+ buf.flush()
+ assert(cap.messages.isEmpty)
+ }
+
+ it should "assign monotonically increasing sequence numbers across multiple
flushes" in {
+ // The gateway tracks sequence numbers per channel; each successive
+ // DataFrame on the same channel gets the next number. Pin so a
+ // regression that resets seq on flush is visible.
+ val (buf, cap) = newBuffer(batchSize = 1) // each addTuple flushes
+ (0 until 4).foreach(i => buf.addTuple(tuple(i)))
+ val seqs = cap.messages.map(_.sequenceNumber).toList
+ assert(seqs == List(0L, 1L, 2L, 3L), s"unexpected sequence: $seqs")
+ }
+
+ // --- sendState ----------------------------------------------------------
+
+ "NetworkOutputBuffer.sendState" should "flush pending tuples FIRST, then
send the StateFrame" in {
+ val (buf, cap) = newBuffer(batchSize = 100)
+ buf.addTuple(tuple(0))
+ buf.addTuple(tuple(1))
+ val state = State(Map("checkpoint" -> 99))
+ buf.sendState(state)
+ // Expected order: DataFrame (the buffered tuples) → StateFrame.
+ assert(cap.messages.size == 2)
+ val first = cap.messages.head.payload
+ val second = cap.messages(1).payload
+ assert(first.isInstanceOf[DataFrame], s"first frame should be DataFrame,
got $first")
+ assert(first.asInstanceOf[DataFrame].frame.toList == List(tuple(0),
tuple(1)))
+ assert(second == StateFrame(state))
+ }
+
+ it should "send only the StateFrame when no tuples are pending (empty
pre-flush is a no-op)" in {
+ val (buf, cap) = newBuffer()
+ val state = State(Map("k" -> "v"))
+ buf.sendState(state)
+ assert(cap.messages.size == 1)
+ assert(cap.messages.head.payload == StateFrame(state))
+ }
+
+ it should "leave the tuple buffer empty after sendState (trailing flush
no-op)" in {
+ // sendState calls flush() AFTER sending the state too. Pin that the
+ // trailing flush doesn't double-send and that subsequent addTuple
+ // starts from a clean buffer.
+ val (buf, cap) = newBuffer(batchSize = 100)
+ buf.addTuple(tuple(0))
+ buf.sendState(State(Map.empty))
+ val countBefore = cap.messages.size // DataFrame + StateFrame = 2
+ assert(countBefore == 2)
+ // Add another tuple and explicit flush — must produce one fresh frame.
+ buf.addTuple(tuple(99))
+ buf.flush()
+ assert(cap.messages.size == 3)
+ val third = cap.messages(2).payload.asInstanceOf[DataFrame]
+ assert(third.frame.toList == List(tuple(99)), "post-state buffer must
start empty")
+ }
+
+ it should "share a single sequence-number stream across DataFrames and the
StateFrame on the same channel" in {
+ // Pin: DataFrame and StateFrame go through the same `sendTo` path on
+ // the same channel, so they share the gateway's sequence-number
+ // counter. A regression that opens a side-channel for StateFrame
+ // would produce a non-monotonic stream and fail this.
+ val (buf, cap) = newBuffer(batchSize = 100)
+ buf.addTuple(tuple(0))
+ buf.addTuple(tuple(1))
+ buf.sendState(State(Map("x" -> 1)))
+ buf.addTuple(tuple(2))
+ buf.flush()
+ val seqs = cap.messages.map(_.sequenceNumber).toList
+ assert(seqs == List(0L, 1L, 2L), s"unexpected sequence: $seqs")
+ }
+
+ // --- batchSize edge cases -------------------------------------------------
+
+ "NetworkOutputBuffer with batchSize = 1" should "flush immediately after
every addTuple" in {
+ val (buf, cap) = newBuffer(batchSize = 1)
+ buf.addTuple(tuple(0))
+ assert(cap.messages.size == 1)
+ buf.addTuple(tuple(1))
+ assert(cap.messages.size == 2)
+ val frames =
cap.messages.toList.map(_.payload.asInstanceOf[DataFrame].frame.toList)
+ assert(frames == List(List(tuple(0)), List(tuple(1))))
+ }
+
+ // `batchSize <= 0` IS reachable from production today: the
+ // workflow-settings UI restricts the value to `>= 1`, but
+ // `SyncExecutionResource` accepts `request.workflowSettings` directly
+ // from the API and the backend forwards `workflowSettings
+ // .dataTransferBatchSize` into `NetworkOutputBuffer` without
+ // validating it. The reachable path is covered by a characterization
+ // test (current lenient `>=` behavior — flush every tuple) plus a
+ // pendingUntilFixed test pinning the desired hardening (rejection
+ // at construction). When the hardening lands the characterization
+ // test breaks on purpose AND pendingUntilFixed flips into a
+ // deliberate failure forcing both markers to be updated together.
+
+ "NetworkOutputBuffer with non-positive batchSize" should
+ "currently flush per-tuple under the `>=` guard (characterization, today's
lenient behavior)" in {
+ // Pin the current observable behavior for the reachable-from-API
+ // `batchSize <= 0` path so a regression that breaks per-tuple
+ // flush (e.g. a partial change that disables flushing entirely
+ // for non-positive batch sizes) surfaces here. A future hardening
+ // that rejects `<= 0` at construction WILL break this test on
+ // purpose — and the pendingUntilFixed test below will flip into
+ // a deliberate failure at the same time, forcing both markers to
+ // be updated together.
+ val (buf0, cap0) = newBuffer(batchSize = 0)
+ buf0.addTuple(tuple(1))
+ buf0.addTuple(tuple(2))
+ val frames0 =
cap0.messages.toList.map(_.payload.asInstanceOf[DataFrame].frame.toList)
+ assert(frames0 == List(List(tuple(1)), List(tuple(2))))
+
+ val (bufNeg, capNeg) = newBuffer(batchSize = -1)
+ bufNeg.addTuple(tuple(99))
+ val framesNeg =
capNeg.messages.toList.map(_.payload.asInstanceOf[DataFrame].frame.toList)
+ assert(framesNeg == List(List(tuple(99))))
+ }
+
+ it should "eventually reject construction (pendingUntilFixed)" in
pendingUntilFixed {
+ // Today the constructor accepts `batchSize <= 0` and the `>=`
+ // guard then fires after every append (the characterization
+ // above pins that behavior). The intended contract is that a
+ // non-positive batch size is invalid input and should be
+ // rejected at construction (e.g. `require(batchSize > 0, ...)`).
+ // Asserting `IllegalArgumentException` here flips this from
+ // pending to passing once the hardening lands.
+ val cap = new Capture
+ intercept[IllegalArgumentException] {
+ new NetworkOutputBuffer(receiver, cap.gateway, batchSize = 0)
+ }
+ intercept[IllegalArgumentException] {
+ new NetworkOutputBuffer(receiver, cap.gateway, batchSize = -1)
+ }
+ }
+}