This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new cc94414e60 test(amber): add unit test coverage for
DeadLetterMonitorActor (#5705)
cc94414e60 is described below
commit cc94414e6059944d304c7c85e9cebe03d6a2453d
Author: yangzhang75 <[email protected]>
AuthorDate: Tue Jun 23 18:47:14 2026 -0700
test(amber): add unit test coverage for DeadLetterMonitorActor (#5705)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
Adds DeadLetterMonitorActorSpec (Pekko TestKit) covering all three
branches: a NetworkMessage dead letter is forwarded to the original
sender as MessageBecomesDeadLetter; a non-NetworkMessage dead letter is
ignored; a non-DeadLetter message is ignored.
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Closes #5663
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
New spec run via sbt "amber/testOnly *DeadLetterMonitorActorSpec": 3
tests pass. scalafmt clean.
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
Generated-by: Claude Code (Claude Opus 4.8)
---
.../DeadLetterMonitorActorSpec.scala | 88 ++++++++++++++++++++++
1 file changed, 88 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/DeadLetterMonitorActorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/DeadLetterMonitorActorSpec.scala
new file mode 100644
index 0000000000..49459d699b
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/DeadLetterMonitorActorSpec.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.pekko.actor.{ActorSystem, DeadLetter, Props}
+import org.apache.pekko.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema, TupleLike}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import org.apache.texera.amber.engine.architecture.common.WorkflowActor.{
+ MessageBecomesDeadLetter,
+ NetworkMessage
+}
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame,
WorkflowFIFOMessage}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpecLike
+
+import scala.concurrent.duration.DurationInt
+
+class DeadLetterMonitorActorSpec
+ extends TestKit(ActorSystem("DeadLetterMonitorActorSpec"))
+ with ImplicitSender
+ with AnyFlatSpecLike
+ with BeforeAndAfterAll {
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ private val channelId =
+ ChannelIdentity(
+ ActorVirtualIdentity("sender"),
+ ActorVirtualIdentity("receiver"),
+ isControl = false
+ )
+
+ private def aNetworkMessage(): NetworkMessage = {
+ val payload = DataFrame(
+ Array(TupleLike(1) enforceSchema Schema().add("field1",
AttributeType.INTEGER))
+ )
+ NetworkMessage(0, WorkflowFIFOMessage(channelId, 0, payload))
+ }
+
+ "DeadLetterMonitorActor" should "forward MessageBecomesDeadLetter to the
original sender for a NetworkMessage dead letter" in {
+ val monitor = system.actorOf(Props(new DeadLetterMonitorActor()))
+ val originalSender = TestProbe()
+ val recipient = TestProbe()
+ val message = aNetworkMessage()
+
+ monitor ! DeadLetter(message, originalSender.ref, recipient.ref)
+
+ originalSender.expectMsg(MessageBecomesDeadLetter(message))
+ }
+
+ it should "ignore a dead letter whose payload is not a NetworkMessage" in {
+ val monitor = system.actorOf(Props(new DeadLetterMonitorActor()))
+ val originalSender = TestProbe()
+ val recipient = TestProbe()
+
+ monitor ! DeadLetter("not a network message", originalSender.ref,
recipient.ref)
+
+ originalSender.expectNoMessage(200.millis)
+ }
+
+ it should "ignore messages that are not dead letters" in {
+ val monitor = system.actorOf(Props(new DeadLetterMonitorActor()))
+
+ monitor ! "some unrelated message"
+
+ expectNoMessage(200.millis)
+ }
+}