This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cede6f439 test(amber): add unit test coverage for EmptyReplayLogger 
and ReplayLogGenerator (#5554)
9cede6f439 is described below

commit 9cede6f4393ed2822fa1e103706b3f73e14af8b5
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 16 00:06:48 2026 -0700

    test(amber): add unit test coverage for EmptyReplayLogger and 
ReplayLogGenerator (#5554)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of two previously-uncovered modules in
    `engine/architecture/logreplay`. No production-code changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `EmptyReplayLoggerSpec` | `EmptyReplayLogger` | 9 |
    | `ReplayLogGeneratorSpec` | `ReplayLogGenerator` | 10 |
    
    Both spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention.
    
    **Behavior pinned — `EmptyReplayLogger`**
    
    | Surface | Contract |
    | --- | --- |
    | `drainCurrentLogRecords(step)` | returns an empty
    `Array[ReplayLogRecord]` regardless of `step` (positive, zero,
    `Long.MaxValue`, negative); returns a non-null array; element type is
    `ReplayLogRecord` (compile-time enforced) |
    | `markAsReplayDestination` | no-op for any
    `EmbeddedControlMessageIdentity`; does not accumulate state |
    | `logCurrentStepWithMessage` | no-op for any `(step, channelId, msg)`
    triple, including `msg = None` and 100 successive calls |
    | Trait conformance | usable through the `ReplayLogger` interface |
    
    **Behavior pinned — `ReplayLogGenerator.generate`**
    
    | Surface | Contract |
    | --- | --- |
    | `getStorage(None)` (EmptyRecordStorage) | returns `(empty queue, empty
    queue)` |
    | empty `VFSRecordStorage` file | returns `(empty queue, empty queue)` |
    | only `ProcessingStep` records | enqueues all into the steps queue,
    preserving insertion order |
    | only `MessageContent` records | enqueues all into the messages queue,
    preserving insertion order |
    | interleaved steps + messages | partitions correctly by type,
    preserving per-type order |
    | `ReplayDestination(id)` matching `replayTo` | short-circuits — records
    after the cut are NOT enqueued |
    | `ReplayDestination(id)` NOT matching `replayTo` | is silently skipped,
    iteration continues |
    | multiple matching `ReplayDestination` | stops at the FIRST one |
    | unknown record type (e.g. `TerminateSignal`) | throws
    `RuntimeException` with a diagnostic message |
    
    **Notes**
    
    While writing `ReplayLogGeneratorSpec` I discovered a **production
    stream-leak** in `ReplayLogGenerator.generate`: when it hits the
    matching `ReplayDestination` it short-circuits via a non-local `return`,
    leaving the `SequentialRecordReader`'s underlying `Input` stream open.
    On Windows the leaked file handle blocks subsequent temp-dir deletion.
    The spec's `cleanup` helper tolerates the resulting
    `FileSystemException` so the case still passes; the real fix is at the
    source and is out of scope for a test-coverage PR (will file a follow-up
    issue).
    
    `ReplayLogGeneratorSpec` uses a temp-dir-backed
    `VFSRecordStorage[ReplayLogRecord]` and the production
    `AmberRuntime.serde` (suite-local `ActorSystem` injected via reflection,
    torn down in `afterAll`) — same harness pattern as
    `CheckpointSubsystemSpec` / `ClientEventSpec` / `VFSRecordStorageSpec`.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5550.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt "WorkflowExecutionService/testOnly
    org.apache.texera.amber.engine.architecture.logreplay.EmptyReplayLoggerSpec
    
org.apache.texera.amber.engine.architecture.logreplay.ReplayLogGeneratorSpec"`
    — 19 tests, all green
    - `sbt scalafmtCheckAll` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Sonnet 4.5)
---
 .../logreplay/EmptyReplayLoggerSpec.scala          | 130 ++++++++
 .../logreplay/ReplayLogGeneratorSpec.scala         | 361 +++++++++++++++++++++
 2 files changed, 491 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala
new file mode 100644
index 0000000000..ccd7fde8f9
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.logreplay
+
+import org.apache.texera.amber.core.virtualidentity.{
+  ActorVirtualIdentity,
+  ChannelIdentity,
+  EmbeddedControlMessageIdentity
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class EmptyReplayLoggerSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixtures
+  // 
---------------------------------------------------------------------------
+
+  private val channelId: ChannelIdentity =
+    ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), 
isControl = false)
+  private val ecmId: EmbeddedControlMessageIdentity = 
EmbeddedControlMessageIdentity("test-ecm")
+
+  // 
---------------------------------------------------------------------------
+  // drainCurrentLogRecords — always empty
+  // 
---------------------------------------------------------------------------
+
+  "EmptyReplayLogger.drainCurrentLogRecords" should
+    "return an empty Array[ReplayLogRecord] regardless of the step argument" 
in {
+    val logger = new EmptyReplayLogger
+    val r0 = logger.drainCurrentLogRecords(0L)
+    val r1 = logger.drainCurrentLogRecords(1L)
+    val rMax = logger.drainCurrentLogRecords(Long.MaxValue)
+    val rNeg = logger.drainCurrentLogRecords(-1L)
+    assert(r0.isEmpty)
+    assert(r1.isEmpty)
+    assert(rMax.isEmpty)
+    assert(rNeg.isEmpty)
+  }
+
+  it should "return a non-null array (callers iterate it without 
null-checking)" in {
+    val logger = new EmptyReplayLogger
+    val r = logger.drainCurrentLogRecords(42L)
+    assert(r != null)
+    assert(r.length == 0)
+  }
+
+  it should "return arrays whose element type is ReplayLogRecord (compile-time 
enforced)" in {
+    // If a future refactor accidentally widened the return type to
+    // `Array[AnyRef]`, this would fail to typecheck. Pin the contract.
+    val logger = new EmptyReplayLogger
+    val r: Array[ReplayLogRecord] = logger.drainCurrentLogRecords(0L)
+    assert(r.length == 0)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // markAsReplayDestination — no-op
+  // 
---------------------------------------------------------------------------
+
+  "EmptyReplayLogger.markAsReplayDestination" should
+    "accept any EmbeddedControlMessageIdentity without throwing" in {
+    val logger = new EmptyReplayLogger
+    logger.markAsReplayDestination(ecmId) // must not throw
+    // Calling twice with the same id is still a no-op.
+    logger.markAsReplayDestination(ecmId)
+    succeed
+  }
+
+  it should "leave drainCurrentLogRecords output untouched (no internal buffer 
accumulates)" in {
+    val logger = new EmptyReplayLogger
+    logger.markAsReplayDestination(ecmId)
+    logger.markAsReplayDestination(EmbeddedControlMessageIdentity("another"))
+    assert(logger.drainCurrentLogRecords(0L).isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // logCurrentStepWithMessage — no-op
+  // 
---------------------------------------------------------------------------
+
+  "EmptyReplayLogger.logCurrentStepWithMessage" should
+    "accept any (step, channelId, msg) triple without throwing" in {
+    val logger = new EmptyReplayLogger
+    logger.logCurrentStepWithMessage(0L, channelId, msg = None)
+    logger.logCurrentStepWithMessage(1L, channelId, msg = None)
+    logger.logCurrentStepWithMessage(Long.MaxValue, channelId, msg = None)
+    succeed
+  }
+
+  it should "tolerate a None msg argument (the null-object's job is to absorb 
every call)" in {
+    val logger = new EmptyReplayLogger
+    logger.logCurrentStepWithMessage(7L, channelId, msg = None)
+    // Verify nothing was queued in the process.
+    assert(logger.drainCurrentLogRecords(7L).isEmpty)
+  }
+
+  it should "leave drainCurrentLogRecords output empty even after many calls" 
in {
+    val logger = new EmptyReplayLogger
+    (1L to 100L).foreach(i => logger.logCurrentStepWithMessage(i, channelId, 
msg = None))
+    assert(logger.drainCurrentLogRecords(100L).isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // ReplayLogger trait conformance
+  // 
---------------------------------------------------------------------------
+  //
+  // The null-object pattern requires EmptyReplayLogger to be a drop-in for
+  // ReplayLogger callers — pin the upcast.
+
+  "EmptyReplayLogger" should "be usable through the ReplayLogger interface" in 
{
+    val logger: ReplayLogger = new EmptyReplayLogger
+    logger.logCurrentStepWithMessage(0L, channelId, msg = None)
+    logger.markAsReplayDestination(ecmId)
+    assert(logger.drainCurrentLogRecords(0L).isEmpty)
+  }
+}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala
new file mode 100644
index 0000000000..e702f6ca5f
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.logreplay
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.virtualidentity.{
+  ActorVirtualIdentity,
+  ChannelIdentity,
+  EmbeddedControlMessageIdentity
+}
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage, 
VFSRecordStorage}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.nio.file.{Files, Path}
+
+class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+  // 
---------------------------------------------------------------------------
+  // Suite-local Pekko serde injected into AmberRuntime via reflection
+  // 
---------------------------------------------------------------------------
+  //
+  // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`,
+  // so any test that round-trips records through VFSRecordStorage needs
+  // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec /
+  // ClientEventSpec — own a suite-local ActorSystem, inject it into
+  // AmberRuntime's private vars via reflection, tear down in afterAll.
+
+  private val testSystem: ActorSystem =
+    ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig)
+  private val testSerde: Serialization = SerializationExtension(testSystem)
+
+  private def getAmberRuntimeField(name: String): AnyRef = {
+    val field = AmberRuntime.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(AmberRuntime)
+  }
+
+  private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+    val field = AmberRuntime.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.set(AmberRuntime, value)
+  }
+
+  // Capture whatever AmberRuntime held before we overwrite it so afterAll can
+  // restore it. Unconditionally nulling the fields would clobber an already
+  // initialized AmberRuntime and couple this suite to test execution order.
+  private var prevActorSystem: AnyRef = _
+  private var prevSerde: AnyRef = _
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    prevActorSystem = getAmberRuntimeField("_actorSystem")
+    prevSerde = getAmberRuntimeField("_serde")
+    setAmberRuntimeField("_actorSystem", testSystem)
+    setAmberRuntimeField("_serde", testSerde)
+  }
+
+  override protected def afterAll(): Unit = {
+    setAmberRuntimeField("_serde", prevSerde)
+    setAmberRuntimeField("_actorSystem", prevActorSystem)
+    TestKit.shutdownActorSystem(testSystem)
+    super.afterAll()
+  }
+
+  private val isWindows: Boolean =
+    System.getProperty("os.name", "").toLowerCase.contains("win")
+
+  // Best-effort temp-dir cleanup. `Files.walk` returns a closeable Stream
+  // backed by an open directory handle — wrap in try/finally so the
+  // handle is released even if traversal throws.
+  //
+  // On Windows we tolerate `FileSystemException` on `deleteIfExists` because
+  // `ReplayLogGenerator.generate` short-circuits at `ReplayDestination`
+  // via a non-local `return`, which leaks the underlying
+  // `SequentialRecordReader.Input` stream — and a leaked open file handle
+  // blocks the temp file from being deleted there. That is a production bug
+  // to fix separately; in-test we just let the OS reap the temp files later
+  // instead of failing the case. On other platforms an open handle does not
+  // block deletion, so a `FileSystemException` signals a real problem and is
+  // allowed to propagate.
+  private def cleanup(sub: Path): Unit = {
+    val root = sub.getParent
+    if (root == null || !Files.exists(root)) return
+    val stream = Files.walk(root)
+    try {
+      stream
+        .sorted(java.util.Comparator.reverseOrder())
+        .forEach { child =>
+          try Files.deleteIfExists(child)
+          catch { case _: java.nio.file.FileSystemException if isWindows => () 
}
+        }
+    } finally {
+      stream.close()
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Fixtures
+  // 
---------------------------------------------------------------------------
+
+  private val cid: ChannelIdentity =
+    ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), 
isControl = false)
+  private val destA: EmbeddedControlMessageIdentity = 
EmbeddedControlMessageIdentity("dest-A")
+  private val destB: EmbeddedControlMessageIdentity = 
EmbeddedControlMessageIdentity("dest-B")
+
+  private def newStorage(): (Path, SequentialRecordStorage[ReplayLogRecord]) = 
{
+    val root = Files.createTempDirectory("replay-log-generator-spec-")
+    val sub = root.resolve("logs")
+    val storage = new VFSRecordStorage[ReplayLogRecord](sub.toUri)
+    (sub, storage)
+  }
+
+  private def writeLog(
+      storage: SequentialRecordStorage[ReplayLogRecord],
+      records: Seq[ReplayLogRecord]
+  ): Unit = {
+    val writer = storage.getWriter("log")
+    // Close in a finally so a serialization failure mid-write does not leak
+    // the underlying output stream (which would otherwise block temp-dir
+    // cleanup, especially on Windows).
+    try {
+      records.foreach(writer.writeRecord)
+      writer.flush()
+    } finally {
+      writer.close()
+    }
+  }
+
+  private def msg(seq: Long): WorkflowFIFOMessage =
+    WorkflowFIFOMessage(cid, seq, DataFrame(Array.empty))
+
+  // 
---------------------------------------------------------------------------
+  // Empty storage
+  // 
---------------------------------------------------------------------------
+
+  "ReplayLogGenerator.generate" should
+    "return empty queues when the storage is an EmptyRecordStorage" in {
+    val storage = SequentialRecordStorage.getStorage[ReplayLogRecord](None)
+    val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA)
+    assert(steps.isEmpty)
+    assert(messages.isEmpty)
+  }
+
+  it should "return empty queues when the storage file exists but holds no 
records" in {
+    val (sub, storage) = newStorage()
+    try {
+      writeLog(storage, Seq.empty)
+      val (steps, messages) = ReplayLogGenerator.generate(storage, "log", 
destA)
+      assert(steps.isEmpty)
+      assert(messages.isEmpty)
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Partitioning by record type
+  // 
---------------------------------------------------------------------------
+
+  it should "enqueue all ProcessingStep records into the steps queue 
(preserving order)" in {
+    val (sub, storage) = newStorage()
+    try {
+      val recs = Seq(
+        ProcessingStep(cid, 1L),
+        ProcessingStep(cid, 2L),
+        ProcessingStep(cid, 3L)
+      )
+      writeLog(storage, recs)
+      val (steps, messages) = ReplayLogGenerator.generate(storage, "log", 
destA)
+      assert(steps.toList == recs)
+      assert(messages.isEmpty)
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  it should "enqueue all MessageContent records into the messages queue 
(preserving order)" in {
+    val (sub, storage) = newStorage()
+    try {
+      val m1 = msg(1L)
+      val m2 = msg(2L)
+      writeLog(storage, Seq(MessageContent(m1), MessageContent(m2)))
+      val (steps, messages) = ReplayLogGenerator.generate(storage, "log", 
destA)
+      assert(steps.isEmpty)
+      // The pair is round-tripped through Kryo; the message reference is
+      // not preserved (a fresh deserialized copy comes back) — so compare
+      // by case-class equality, not `eq`.
+      assert(messages.toList == List(m1, m2))
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  it should
+    "partition steps and messages independently when records are interleaved" 
in {
+    val (sub, storage) = newStorage()
+    try {
+      val s1 = ProcessingStep(cid, 1L)
+      val s2 = ProcessingStep(cid, 2L)
+      val m1 = msg(1L)
+      val m2 = msg(2L)
+      writeLog(
+        storage,
+        Seq(s1, MessageContent(m1), s2, MessageContent(m2))
+      )
+      val (steps, messages) = ReplayLogGenerator.generate(storage, "log", 
destA)
+      assert(steps.toList == List(s1, s2))
+      assert(messages.toList == List(m1, m2))
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // ReplayDestination — early termination + skip semantics
+  // 
---------------------------------------------------------------------------
+
+  it should
+    "short-circuit at the matching ReplayDestination, ignoring records that 
follow it" in {
+    val (sub, storage) = newStorage()
+    try {
+      val s1 = ProcessingStep(cid, 1L)
+      val m1 = msg(1L)
+      val m2Past = msg(2L) // must NOT appear in the result
+      val s2Past = ProcessingStep(cid, 99L) // must NOT appear either
+      writeLog(
+        storage,
+        Seq(
+          s1,
+          MessageContent(m1),
+          ReplayDestination(destA), // <-- replayTo target; iteration stops 
here
+          MessageContent(m2Past),
+          s2Past
+        )
+      )
+      val (steps, messages) = ReplayLogGenerator.generate(storage, "log", 
destA)
+      assert(steps.toList == List(s1))
+      assert(messages.toList == List(m1))
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  it should
+    "silently skip a ReplayDestination whose id does not match replayTo 
(iteration continues)" in {
+    val (sub, storage) = newStorage()
+    try {
+      val s1 = ProcessingStep(cid, 1L)
+      val m1 = msg(1L)
+      writeLog(
+        storage,
+        Seq(
+          s1,
+          ReplayDestination(destB), // <-- different id; skipped
+          MessageContent(m1)
+        )
+      )
+      val (steps, messages) = ReplayLogGenerator.generate(storage, "log", 
destA)
+      assert(steps.toList == List(s1))
+      assert(messages.toList == List(m1))
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  it should
+    "stop at the FIRST matching ReplayDestination when multiple matching 
records exist" in {
+    val (sub, storage) = newStorage()
+    try {
+      val s1 = ProcessingStep(cid, 1L)
+      val s2Past = ProcessingStep(cid, 2L)
+      writeLog(
+        storage,
+        Seq(
+          s1,
+          ReplayDestination(destA), // <-- first match
+          s2Past, // <-- after the cut
+          ReplayDestination(destA)
+        )
+      )
+      val (steps, _) = ReplayLogGenerator.generate(storage, "log", destA)
+      assert(steps.toList == List(s1))
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Unknown record type — TerminateSignal triggers the `case other` branch
+  // 
---------------------------------------------------------------------------
+
+  it should "throw RuntimeException on an unhandled record type (e.g. 
TerminateSignal)" in {
+    val (sub, storage) = newStorage()
+    try {
+      writeLog(storage, Seq(TerminateSignal))
+      val ex = intercept[RuntimeException] {
+        ReplayLogGenerator.generate(storage, "log", destA)
+      }
+      assert(
+        ex.getMessage.toLowerCase.contains("cannot handle"),
+        s"expected diagnostic message about unhandled record, got: 
${ex.getMessage}"
+      )
+    } finally {
+      cleanup(sub)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Mixed-record full-cycle
+  // 
---------------------------------------------------------------------------
+
+  it should "handle a realistic mix of steps + messages + non-matching 
destinations" in {
+    val (sub, storage) = newStorage()
+    try {
+      val s1 = ProcessingStep(cid, 10L)
+      val s2 = ProcessingStep(cid, 20L)
+      val s3 = ProcessingStep(cid, 30L)
+      val m1 = msg(1L)
+      val m2 = msg(2L)
+      writeLog(
+        storage,
+        Seq(
+          s1,
+          MessageContent(m1),
+          ReplayDestination(destB), // skipped (id mismatch)
+          s2,
+          MessageContent(m2),
+          ReplayDestination(destB), // also skipped
+          s3
+        )
+      )
+      val (steps, messages) = ReplayLogGenerator.generate(storage, "log", 
destA)
+      assert(steps.toList == List(s1, s2, s3))
+      assert(messages.toList == List(m1, m2))
+    } finally {
+      cleanup(sub)
+    }
+  }
+}

Reply via email to