This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new cee408b755 test(amber): add unit test coverage for checkpoint
subsystem (#4838)
cee408b755 is described below
commit cee408b7554c8194d2f404d739d71bf0a6941f65
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 13:35:54 2026 -0700
test(amber): add unit test coverage for checkpoint subsystem (#4838)
### What changes were proposed in this PR?
Add `CheckpointSubsystemSpec` covering the three classes/traits in
`amber/.../engine/common` that make up the checkpoint data contract:
- `SerializedState`: well-known key constants pinned (`CP_STATE_KEY`,
`DP_STATE_KEY`, `IN_FLIGHT_MSG_KEY`, `DP_QUEUED_MSG_KEY`,
`OUTPUT_MSG_KEY`); `fromObject` / `toObject` round-trip via the
configured Pekko Serialization; field-shape and size accounting
- `CheckpointState`: defaults, save/load round-trip on primitive and
String values, overwrite-by-key, multi-key independence, missing-key
RuntimeException, size summation across entries
- `CheckpointSupport` trait: implementable by a custom subclass
forwarding to a `CheckpointState`, exercising `serializeState` /
`deserializeState` / `getEstimatedCheckpointCost`
Complements the in-flight #4687 (CheckpointState unknown-key negative
test in `CheckpointSpec`) with dedicated unit-level coverage.
### Any related issues, documentation, discussions?
Closes #4837
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.common.CheckpointSubsystemSpec"` - 11/11
tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../engine/common/CheckpointSubsystemSpec.scala | 177 +++++++++++++++++++++
1 file changed, 177 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
new file mode 100644
index 0000000000..45b1727afc
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+class CheckpointSubsystemSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+ // Suite-local actor system. We also inject it into AmberRuntime via
+ // reflection so that CheckpointState.save/load (which hard-code
+ // AmberRuntime.serde) reuse the same system. Both the suite-local system
+ // and AmberRuntime's reference are torn down in afterAll, so no Pekko
+ // threads outlive the test (matching ControllerSpec/WorkerSpec hygiene).
+ private val testSystem: ActorSystem =
+ ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.akkaConfig)
+ private val testSerde: Serialization = SerializationExtension(testSystem)
+
+ private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+ val field = AmberRuntime.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.set(AmberRuntime, value)
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ setAmberRuntimeField("_actorSystem", testSystem)
+ setAmberRuntimeField("_serde", testSerde)
+ }
+
+ override protected def afterAll(): Unit = {
+ setAmberRuntimeField("_serde", null)
+ setAmberRuntimeField("_actorSystem", null)
+ TestKit.shutdownActorSystem(testSystem)
+ super.afterAll()
+ }
+
+ //
---------------------------------------------------------------------------
+ // SerializedState
+ //
---------------------------------------------------------------------------
+
+ "SerializedState" should "expose stable well-known key constants" in {
+ // These constants are referenced from outside the engine; pin the strings
+ // so a rename surfaces as a test failure.
+ assert(SerializedState.CP_STATE_KEY == "Amber_CPState")
+ assert(SerializedState.DP_STATE_KEY == "Amber_DPState")
+ assert(SerializedState.IN_FLIGHT_MSG_KEY == "Amber_Inflight_Messages")
+ assert(SerializedState.DP_QUEUED_MSG_KEY == "Amber_DP_Queued_Messages")
+ assert(SerializedState.OUTPUT_MSG_KEY == "Amber_Output_Messages")
+ }
+
+ it should "round-trip a value through fromObject / toObject using a
suite-local Serialization" in {
+ // Use the suite-local serde directly so this case does not even touch
+ // AmberRuntime.
+ val original: java.lang.Integer = Integer.valueOf(42)
+ val state = SerializedState.fromObject(original, testSerde)
+ assert(state.bytes.length > 0)
+ assert(state.size() == state.bytes.length.toLong)
+ val restored = state.toObject[java.lang.Integer](testSerde)
+ assert(restored == original)
+ }
+
+ it should "carry the serializer id and manifest given at construction" in {
+ val s = SerializedState(Array[Byte](1, 2, 3), serializerId = 7, manifest =
"manifest-x")
+ assert(s.bytes.toSeq == Seq[Byte](1, 2, 3))
+ assert(s.serializerId == 7)
+ assert(s.manifest == "manifest-x")
+ assert(s.size() == 3L)
+ }
+
+ //
---------------------------------------------------------------------------
+ // CheckpointState
+ //
---------------------------------------------------------------------------
+
+ "CheckpointState" should "default to size = 0 with no entries" in {
+ val cp = new CheckpointState()
+ assert(cp.size() == 0L)
+ assert(!cp.has("anything"))
+ }
+
+ "CheckpointState.save / load" should "round-trip a primitive value" in {
+ val cp = new CheckpointState()
+ cp.save("answer", java.lang.Integer.valueOf(42))
+ assert(cp.has("answer"))
+ val restored: java.lang.Integer = cp.load[java.lang.Integer]("answer")
+ assert(restored == java.lang.Integer.valueOf(42))
+ }
+
+ it should "round-trip a String value" in {
+ val cp = new CheckpointState()
+ cp.save("greeting", "hello")
+ assert(cp.load[String]("greeting") == "hello")
+ }
+
+ it should "overwrite a previously saved key" in {
+ val cp = new CheckpointState()
+ cp.save("k", java.lang.Integer.valueOf(1))
+ cp.save("k", java.lang.Integer.valueOf(2))
+ assert(cp.load[java.lang.Integer]("k") == java.lang.Integer.valueOf(2))
+ }
+
+ it should "track distinct keys independently" in {
+ val cp = new CheckpointState()
+ cp.save("a", "alpha")
+ cp.save("b", "beta")
+ assert(cp.load[String]("a") == "alpha")
+ assert(cp.load[String]("b") == "beta")
+ }
+
+ "CheckpointState.load" should "raise RuntimeException for an unknown key" in
{
+ val cp = new CheckpointState()
+ val ex = intercept[RuntimeException] {
+ cp.load[Any]("missing")
+ }
+ assert(ex.getMessage.contains("missing"))
+ }
+
+ "CheckpointState.size" should "be the sum of every entry's serialized byte
length" in {
+ val cp = new CheckpointState()
+ cp.save("a", "x")
+ val sizeAfterOne = cp.size()
+ assert(sizeAfterOne > 0L)
+ cp.save("b", "yy")
+ assert(cp.size() > sizeAfterOne)
+ }
+
+ //
---------------------------------------------------------------------------
+ // CheckpointSupport (trait shape)
+ //
---------------------------------------------------------------------------
+
+ "CheckpointSupport" should "be implementable by a custom subclass forwarding
to a CheckpointState" in {
+ val support = new CheckpointSupport {
+ override def serializeState(
+ currentIteratorState: Iterator[(TupleLike, Option[PortIdentity])],
+ checkpoint: CheckpointState
+ ): Iterator[(TupleLike, Option[PortIdentity])] = {
+ checkpoint.save("marker", java.lang.Integer.valueOf(1))
+ currentIteratorState
+ }
+
+ override def deserializeState(
+ checkpoint: CheckpointState
+ ): Iterator[(TupleLike, Option[PortIdentity])] = Iterator.empty
+
+ override def getEstimatedCheckpointCost: Long = 7L
+ }
+
+ val cp = new CheckpointState()
+ val out = support.serializeState(Iterator.empty, cp)
+ assert(out.isEmpty)
+ assert(cp.has("marker"))
+ assert(support.deserializeState(cp).isEmpty)
+ assert(support.getEstimatedCheckpointCost == 7L)
+ }
+}