This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9cede6f439 test(amber): add unit test coverage for EmptyReplayLogger
and ReplayLogGenerator (#5554)
9cede6f439 is described below
commit 9cede6f4393ed2822fa1e103706b3f73e14af8b5
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 16 00:06:48 2026 -0700
test(amber): add unit test coverage for EmptyReplayLogger and
ReplayLogGenerator (#5554)
### What changes were proposed in this PR?
Pin behavior of two previously-uncovered modules in
`engine/architecture/logreplay`. No production-code changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `EmptyReplayLoggerSpec` | `EmptyReplayLogger` | 9 |
| `ReplayLogGeneratorSpec` | `ReplayLogGenerator` | 10 |
Both spec files follow the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned — `EmptyReplayLogger`**
| Surface | Contract |
| --- | --- |
| `drainCurrentLogRecords(step)` | returns an empty
`Array[ReplayLogRecord]` regardless of `step` (positive, zero,
`Long.MaxValue`, negative); returns a non-null array; element type is
`ReplayLogRecord` (compile-time enforced) |
| `markAsReplayDestination` | no-op for any
`EmbeddedControlMessageIdentity`; does not accumulate state |
| `logCurrentStepWithMessage` | no-op for any `(step, channelId, msg)`
triple, including `msg = None` and 100 successive calls |
| Trait conformance | usable through the `ReplayLogger` interface |
**Behavior pinned — `ReplayLogGenerator.generate`**
| Surface | Contract |
| --- | --- |
| `getStorage(None)` (EmptyRecordStorage) | returns `(empty queue, empty
queue)` |
| empty `VFSRecordStorage` file | returns `(empty queue, empty queue)` |
| only `ProcessingStep` records | enqueues all into the steps queue,
preserving insertion order |
| only `MessageContent` records | enqueues all into the messages queue,
preserving insertion order |
| interleaved steps + messages | partitions correctly by type,
preserving per-type order |
| `ReplayDestination(id)` matching `replayTo` | short-circuits — records
after the cut are NOT enqueued |
| `ReplayDestination(id)` NOT matching `replayTo` | is silently skipped,
iteration continues |
| multiple matching `ReplayDestination` | stops at the FIRST one |
| unknown record type (e.g. `TerminateSignal`) | throws
`RuntimeException` with a diagnostic message |
**Notes**
While writing `ReplayLogGeneratorSpec` I discovered a **production
stream-leak** in `ReplayLogGenerator.generate`: when it hits the
matching `ReplayDestination` it short-circuits via a non-local `return`,
leaving the `SequentialRecordReader`'s underlying `Input` stream open.
On Windows the leaked file handle blocks subsequent temp-dir deletion.
The spec's `cleanup` helper tolerates the resulting
`FileSystemException` so the case still passes; the real fix is at the
source and is out of scope for a test-coverage PR (will file a follow-up
issue).
`ReplayLogGeneratorSpec` uses a temp-dir-backed
`VFSRecordStorage[ReplayLogRecord]` and the production
`AmberRuntime.serde` (suite-local `ActorSystem` injected via reflection,
torn down in `afterAll`) — same harness pattern as
`CheckpointSubsystemSpec` / `ClientEventSpec` / `VFSRecordStorageSpec`.
### Any related issues, documentation, discussions?
Closes #5550.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.logreplay.EmptyReplayLoggerSpec
org.apache.texera.amber.engine.architecture.logreplay.ReplayLogGeneratorSpec"`
— 19 tests, all green
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Sonnet 4.5)
---
.../logreplay/EmptyReplayLoggerSpec.scala | 130 ++++++++
.../logreplay/ReplayLogGeneratorSpec.scala | 361 +++++++++++++++++++++
2 files changed, 491 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala
new file mode 100644
index 0000000000..ccd7fde8f9
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.logreplay
+
+import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ EmbeddedControlMessageIdentity
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class EmptyReplayLoggerSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixtures
+ //
---------------------------------------------------------------------------
+
+ private val channelId: ChannelIdentity =
+ ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"),
isControl = false)
+ private val ecmId: EmbeddedControlMessageIdentity =
EmbeddedControlMessageIdentity("test-ecm")
+
+ //
---------------------------------------------------------------------------
+ // drainCurrentLogRecords — always empty
+ //
---------------------------------------------------------------------------
+
+ "EmptyReplayLogger.drainCurrentLogRecords" should
+ "return an empty Array[ReplayLogRecord] regardless of the step argument"
in {
+ val logger = new EmptyReplayLogger
+ val r0 = logger.drainCurrentLogRecords(0L)
+ val r1 = logger.drainCurrentLogRecords(1L)
+ val rMax = logger.drainCurrentLogRecords(Long.MaxValue)
+ val rNeg = logger.drainCurrentLogRecords(-1L)
+ assert(r0.isEmpty)
+ assert(r1.isEmpty)
+ assert(rMax.isEmpty)
+ assert(rNeg.isEmpty)
+ }
+
+ it should "return a non-null array (callers iterate it without
null-checking)" in {
+ val logger = new EmptyReplayLogger
+ val r = logger.drainCurrentLogRecords(42L)
+ assert(r != null)
+ assert(r.length == 0)
+ }
+
+ it should "return arrays whose element type is ReplayLogRecord (compile-time
enforced)" in {
+ // If a future refactor accidentally widened the return type to
+ // `Array[AnyRef]`, this would fail to typecheck. Pin the contract.
+ val logger = new EmptyReplayLogger
+ val r: Array[ReplayLogRecord] = logger.drainCurrentLogRecords(0L)
+ assert(r.length == 0)
+ }
+
+ //
---------------------------------------------------------------------------
+ // markAsReplayDestination — no-op
+ //
---------------------------------------------------------------------------
+
+ "EmptyReplayLogger.markAsReplayDestination" should
+ "accept any EmbeddedControlMessageIdentity without throwing" in {
+ val logger = new EmptyReplayLogger
+ logger.markAsReplayDestination(ecmId) // must not throw
+ // Calling twice with the same id is still a no-op.
+ logger.markAsReplayDestination(ecmId)
+ succeed
+ }
+
+ it should "leave drainCurrentLogRecords output untouched (no internal buffer
accumulates)" in {
+ val logger = new EmptyReplayLogger
+ logger.markAsReplayDestination(ecmId)
+ logger.markAsReplayDestination(EmbeddedControlMessageIdentity("another"))
+ assert(logger.drainCurrentLogRecords(0L).isEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // logCurrentStepWithMessage — no-op
+ //
---------------------------------------------------------------------------
+
+ "EmptyReplayLogger.logCurrentStepWithMessage" should
+ "accept any (step, channelId, msg) triple without throwing" in {
+ val logger = new EmptyReplayLogger
+ logger.logCurrentStepWithMessage(0L, channelId, msg = None)
+ logger.logCurrentStepWithMessage(1L, channelId, msg = None)
+ logger.logCurrentStepWithMessage(Long.MaxValue, channelId, msg = None)
+ succeed
+ }
+
+ it should "tolerate a None msg argument (the null-object's job is to absorb
every call)" in {
+ val logger = new EmptyReplayLogger
+ logger.logCurrentStepWithMessage(7L, channelId, msg = None)
+ // Verify nothing was queued in the process.
+ assert(logger.drainCurrentLogRecords(7L).isEmpty)
+ }
+
+ it should "leave drainCurrentLogRecords output empty even after many calls"
in {
+ val logger = new EmptyReplayLogger
+ (1L to 100L).foreach(i => logger.logCurrentStepWithMessage(i, channelId,
msg = None))
+ assert(logger.drainCurrentLogRecords(100L).isEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // ReplayLogger trait conformance
+ //
---------------------------------------------------------------------------
+ //
+ // The null-object pattern requires EmptyReplayLogger to be a drop-in for
+ // ReplayLogger callers — pin the upcast.
+
+ "EmptyReplayLogger" should "be usable through the ReplayLogger interface" in
{
+ val logger: ReplayLogger = new EmptyReplayLogger
+ logger.logCurrentStepWithMessage(0L, channelId, msg = None)
+ logger.markAsReplayDestination(ecmId)
+ assert(logger.drainCurrentLogRecords(0L).isEmpty)
+ }
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala
new file mode 100644
index 0000000000..e702f6ca5f
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.logreplay
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ EmbeddedControlMessageIdentity
+}
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame,
WorkflowFIFOMessage}
+import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage,
VFSRecordStorage}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.nio.file.{Files, Path}
+
+class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+ //
---------------------------------------------------------------------------
+ // Suite-local Pekko serde injected into AmberRuntime via reflection
+ //
---------------------------------------------------------------------------
+ //
+ // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`,
+ // so any test that round-trips records through VFSRecordStorage needs
+ // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec /
+ // ClientEventSpec — own a suite-local ActorSystem, inject it into
+ // AmberRuntime's private vars via reflection, tear down in afterAll.
+
+ private val testSystem: ActorSystem =
+ ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig)
+ private val testSerde: Serialization = SerializationExtension(testSystem)
+
+ private def getAmberRuntimeField(name: String): AnyRef = {
+ val field = AmberRuntime.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.get(AmberRuntime)
+ }
+
+ private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+ val field = AmberRuntime.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.set(AmberRuntime, value)
+ }
+
+ // Capture whatever AmberRuntime held before we overwrite it so afterAll can
+ // restore it. Unconditionally nulling the fields would clobber an already
+ // initialized AmberRuntime and couple this suite to test execution order.
+ private var prevActorSystem: AnyRef = _
+ private var prevSerde: AnyRef = _
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ prevActorSystem = getAmberRuntimeField("_actorSystem")
+ prevSerde = getAmberRuntimeField("_serde")
+ setAmberRuntimeField("_actorSystem", testSystem)
+ setAmberRuntimeField("_serde", testSerde)
+ }
+
+ override protected def afterAll(): Unit = {
+ setAmberRuntimeField("_serde", prevSerde)
+ setAmberRuntimeField("_actorSystem", prevActorSystem)
+ TestKit.shutdownActorSystem(testSystem)
+ super.afterAll()
+ }
+
+ private val isWindows: Boolean =
+ System.getProperty("os.name", "").toLowerCase.contains("win")
+
+ // Best-effort temp-dir cleanup. `Files.walk` returns a closeable Stream
+ // backed by an open directory handle — wrap in try/finally so the
+ // handle is released even if traversal throws.
+ //
+ // On Windows we tolerate `FileSystemException` on `deleteIfExists` because
+ // `ReplayLogGenerator.generate` short-circuits at `ReplayDestination`
+ // via a non-local `return`, which leaks the underlying
+ // `SequentialRecordReader.Input` stream — and a leaked open file handle
+ // blocks the temp file from being deleted there. That is a production bug
+ // to fix separately; in-test we just let the OS reap the temp files later
+ // instead of failing the case. On other platforms an open handle does not
+ // block deletion, so a `FileSystemException` signals a real problem and is
+ // allowed to propagate.
+ private def cleanup(sub: Path): Unit = {
+ val root = sub.getParent
+ if (root == null || !Files.exists(root)) return
+ val stream = Files.walk(root)
+ try {
+ stream
+ .sorted(java.util.Comparator.reverseOrder())
+ .forEach { child =>
+ try Files.deleteIfExists(child)
+ catch { case _: java.nio.file.FileSystemException if isWindows => ()
}
+ }
+ } finally {
+ stream.close()
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Fixtures
+ //
---------------------------------------------------------------------------
+
+ private val cid: ChannelIdentity =
+ ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"),
isControl = false)
+ private val destA: EmbeddedControlMessageIdentity =
EmbeddedControlMessageIdentity("dest-A")
+ private val destB: EmbeddedControlMessageIdentity =
EmbeddedControlMessageIdentity("dest-B")
+
+ private def newStorage(): (Path, SequentialRecordStorage[ReplayLogRecord]) =
{
+ val root = Files.createTempDirectory("replay-log-generator-spec-")
+ val sub = root.resolve("logs")
+ val storage = new VFSRecordStorage[ReplayLogRecord](sub.toUri)
+ (sub, storage)
+ }
+
+ private def writeLog(
+ storage: SequentialRecordStorage[ReplayLogRecord],
+ records: Seq[ReplayLogRecord]
+ ): Unit = {
+ val writer = storage.getWriter("log")
+ // Close in a finally so a serialization failure mid-write does not leak
+ // the underlying output stream (which would otherwise block temp-dir
+ // cleanup, especially on Windows).
+ try {
+ records.foreach(writer.writeRecord)
+ writer.flush()
+ } finally {
+ writer.close()
+ }
+ }
+
+ private def msg(seq: Long): WorkflowFIFOMessage =
+ WorkflowFIFOMessage(cid, seq, DataFrame(Array.empty))
+
+ //
---------------------------------------------------------------------------
+ // Empty storage
+ //
---------------------------------------------------------------------------
+
+ "ReplayLogGenerator.generate" should
+ "return empty queues when the storage is an EmptyRecordStorage" in {
+ val storage = SequentialRecordStorage.getStorage[ReplayLogRecord](None)
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA)
+ assert(steps.isEmpty)
+ assert(messages.isEmpty)
+ }
+
+ it should "return empty queues when the storage file exists but holds no
records" in {
+ val (sub, storage) = newStorage()
+ try {
+ writeLog(storage, Seq.empty)
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log",
destA)
+ assert(steps.isEmpty)
+ assert(messages.isEmpty)
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Partitioning by record type
+ //
---------------------------------------------------------------------------
+
+ it should "enqueue all ProcessingStep records into the steps queue
(preserving order)" in {
+ val (sub, storage) = newStorage()
+ try {
+ val recs = Seq(
+ ProcessingStep(cid, 1L),
+ ProcessingStep(cid, 2L),
+ ProcessingStep(cid, 3L)
+ )
+ writeLog(storage, recs)
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log",
destA)
+ assert(steps.toList == recs)
+ assert(messages.isEmpty)
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ it should "enqueue all MessageContent records into the messages queue
(preserving order)" in {
+ val (sub, storage) = newStorage()
+ try {
+ val m1 = msg(1L)
+ val m2 = msg(2L)
+ writeLog(storage, Seq(MessageContent(m1), MessageContent(m2)))
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log",
destA)
+ assert(steps.isEmpty)
+ // The pair is round-tripped through Kryo; the message reference is
+ // not preserved (a fresh deserialized copy comes back) — so compare
+ // by case-class equality, not `eq`.
+ assert(messages.toList == List(m1, m2))
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ it should
+ "partition steps and messages independently when records are interleaved"
in {
+ val (sub, storage) = newStorage()
+ try {
+ val s1 = ProcessingStep(cid, 1L)
+ val s2 = ProcessingStep(cid, 2L)
+ val m1 = msg(1L)
+ val m2 = msg(2L)
+ writeLog(
+ storage,
+ Seq(s1, MessageContent(m1), s2, MessageContent(m2))
+ )
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log",
destA)
+ assert(steps.toList == List(s1, s2))
+ assert(messages.toList == List(m1, m2))
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // ReplayDestination — early termination + skip semantics
+ //
---------------------------------------------------------------------------
+
+ it should
+ "short-circuit at the matching ReplayDestination, ignoring records that
follow it" in {
+ val (sub, storage) = newStorage()
+ try {
+ val s1 = ProcessingStep(cid, 1L)
+ val m1 = msg(1L)
+ val m2Past = msg(2L) // must NOT appear in the result
+ val s2Past = ProcessingStep(cid, 99L) // must NOT appear either
+ writeLog(
+ storage,
+ Seq(
+ s1,
+ MessageContent(m1),
+ ReplayDestination(destA), // <-- replayTo target; iteration stops
here
+ MessageContent(m2Past),
+ s2Past
+ )
+ )
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log",
destA)
+ assert(steps.toList == List(s1))
+ assert(messages.toList == List(m1))
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ it should
+ "silently skip a ReplayDestination whose id does not match replayTo
(iteration continues)" in {
+ val (sub, storage) = newStorage()
+ try {
+ val s1 = ProcessingStep(cid, 1L)
+ val m1 = msg(1L)
+ writeLog(
+ storage,
+ Seq(
+ s1,
+ ReplayDestination(destB), // <-- different id; skipped
+ MessageContent(m1)
+ )
+ )
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log",
destA)
+ assert(steps.toList == List(s1))
+ assert(messages.toList == List(m1))
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ it should
+ "stop at the FIRST matching ReplayDestination when multiple matching
records exist" in {
+ val (sub, storage) = newStorage()
+ try {
+ val s1 = ProcessingStep(cid, 1L)
+ val s2Past = ProcessingStep(cid, 2L)
+ writeLog(
+ storage,
+ Seq(
+ s1,
+ ReplayDestination(destA), // <-- first match
+ s2Past, // <-- after the cut
+ ReplayDestination(destA)
+ )
+ )
+ val (steps, _) = ReplayLogGenerator.generate(storage, "log", destA)
+ assert(steps.toList == List(s1))
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Unknown record type — TerminateSignal triggers the `case other` branch
+ //
---------------------------------------------------------------------------
+
+ it should "throw RuntimeException on an unhandled record type (e.g.
TerminateSignal)" in {
+ val (sub, storage) = newStorage()
+ try {
+ writeLog(storage, Seq(TerminateSignal))
+ val ex = intercept[RuntimeException] {
+ ReplayLogGenerator.generate(storage, "log", destA)
+ }
+ assert(
+ ex.getMessage.toLowerCase.contains("cannot handle"),
+ s"expected diagnostic message about unhandled record, got:
${ex.getMessage}"
+ )
+ } finally {
+ cleanup(sub)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Mixed-record full-cycle
+ //
---------------------------------------------------------------------------
+
+ it should "handle a realistic mix of steps + messages + non-matching
destinations" in {
+ val (sub, storage) = newStorage()
+ try {
+ val s1 = ProcessingStep(cid, 10L)
+ val s2 = ProcessingStep(cid, 20L)
+ val s3 = ProcessingStep(cid, 30L)
+ val m1 = msg(1L)
+ val m2 = msg(2L)
+ writeLog(
+ storage,
+ Seq(
+ s1,
+ MessageContent(m1),
+ ReplayDestination(destB), // skipped (id mismatch)
+ s2,
+ MessageContent(m2),
+ ReplayDestination(destB), // also skipped
+ s3
+ )
+ )
+ val (steps, messages) = ReplayLogGenerator.generate(storage, "log",
destA)
+ assert(steps.toList == List(s1, s2, s3))
+ assert(messages.toList == List(m1, m2))
+ } finally {
+ cleanup(sub)
+ }
+ }
+}