This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5150-bf2f92c81bed8520d6421bdb4b820ca61667ca2a
in repository https://gitbox.apache.org/repos/asf/texera.git

commit a7d4bc622f639aa3445d2c199ae4bbded28b8d1c
Author: Matthew B. <[email protected]>
AuthorDate: Fri May 22 23:05:14 2026 -0700

    fix: drop withDefaultValue from StatisticsManager so checkpoint state 
round-trips (#5150)
    
    ### What changes were proposed in this PR?
    `StatisticsManager` declared its input/output stats maps as
    `mutable.Map.empty.withDefaultValue((0L, 0L))`. The resulting
    `Map.WithDefault` wrapper does not survive a Kryo round-trip (its inner
    map deserializes as null), so
    `chkpt.load(CP_STATE_KEY)` on a default-state `ControllerProcessor`
    throws `KryoException: NullPointerException`, blocking
    `Controller.loadFromCheckpoint` from ever rehydrating a checkpointed
    controller. This PR removes the wrapper and inlines `getOrElse(portId,
    (0L, 0L))` at the two write sites; behavior is unchanged.
    ### Any related issues, documentation, or discussions?
    closes: #4686
    ### How was this PR tested?
    Replaced the two existing `should be serializable` cases in
    `CheckpointSpec` with full save then load round-trips (controller +
    worker) that assert `restored.actorId == original.actorId`; the new
    tests reproduce the original NPE on `main` and pass after the fix.
    Verified locally with `sbt 'WorkflowExecutionService / Test / testOnly
    org.apache.texera.amber.engine.faulttolerance.CheckpointSpec'` (3/3
    pass).
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.7 in compliance with ASF
    
    ---------
    
    Signed-off-by: Matthew B. <[email protected]>
---
 .../worker/managers/StatisticsManager.scala             | 17 +++++++++++------
 .../worker/managers/WorkerManagersSpec.scala            | 14 +++++++++-----
 .../amber/engine/faulttolerance/CheckpointSpec.scala    |  8 ++++++--
 3 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
index 8ae0419f0a..ab46e17654 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala
@@ -31,10 +31,11 @@ import scala.collection.mutable
 
 class StatisticsManager {
   // DataProcessor
+  // Plain maps (no withDefaultValue) so they survive Kryo round-trip.
   private val inputStatistics: mutable.Map[PortIdentity, (Long, Long)] =
-    mutable.Map.empty.withDefaultValue((0L, 0L))
+    mutable.Map.empty
   private val outputStatistics: mutable.Map[PortIdentity, (Long, Long)] =
-    mutable.Map.empty.withDefaultValue((0L, 0L))
+    mutable.Map.empty
   private var dataProcessingTime: Long = 0L
   private var totalExecutionTime: Long = 0L
   private var workerStartTime: Long = 0L
@@ -82,8 +83,10 @@ class StatisticsManager {
     */
   def increaseInputStatistics(portId: PortIdentity, size: Long): Unit = {
     require(size >= 0, "Tuple size must be non-negative")
-    val (count, totalSize) = inputStatistics(portId)
-    inputStatistics.update(portId, (count + 1, totalSize + size))
+    inputStatistics.updateWith(portId) {
+      case Some((count, totalSize)) => Some((count + 1, totalSize + size))
+      case None                     => Some((1L, size))
+    }
   }
 
   /**
@@ -93,8 +96,10 @@ class StatisticsManager {
     */
   def increaseOutputStatistics(portId: PortIdentity, size: Long): Unit = {
     require(size >= 0, "Tuple size must be non-negative")
-    val (count, totalSize) = outputStatistics(portId)
-    outputStatistics.update(portId, (count + 1, totalSize + size))
+    outputStatistics.updateWith(portId) {
+      case Some((count, totalSize)) => Some((count + 1, totalSize + size))
+      case None                     => Some((1L, size))
+    }
   }
 
   /**
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
index 3fbff39148..1932823f5d 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
@@ -76,11 +76,15 @@ class WorkerManagersSpec extends AnyFlatSpec {
     val sm = new StatisticsManager()
     sm.increaseOutputStatistics(PortIdentity(0), 30)
     sm.increaseOutputStatistics(PortIdentity(0), 70)
-    assert(sm.getOutputTupleCount == 2L)
-    val out = sm.getStatistics(nullExec).outputTupleMetrics
-    assert(out.size == 1)
-    assert(out.head.tupleMetrics.count == 2L)
-    assert(out.head.tupleMetrics.size == 100L)
+    sm.increaseOutputStatistics(PortIdentity(1), 25)
+    assert(sm.getOutputTupleCount == 3L)
+    val byPort = sm
+      .getStatistics(nullExec)
+      .outputTupleMetrics
+      .map(m => m.portId -> (m.tupleMetrics.count, m.tupleMetrics.size))
+      .toMap
+    assert(byPort(PortIdentity(0)) == (2L, 100L))
+    assert(byPort(PortIdentity(1)) == (1L, 25L))
   }
 
   it should "reject negative tuple sizes" in {
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
index fbc7e8044d..3d207fd23b 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
@@ -63,7 +63,7 @@ class CheckpointSpec extends AnyFlatSpecLike with 
BeforeAndAfterAll {
     system.actorOf(Props[SingleNodeListener](), "cluster-info")
   }
 
-  "Default controller state" should "be serializable" in {
+  "Default controller state" should "round-trip through CheckpointState" in {
     val cp =
       new ControllerProcessor(
         workflow.context,
@@ -73,9 +73,11 @@ class CheckpointSpec extends AnyFlatSpecLike with 
BeforeAndAfterAll {
       )
     val chkpt = new CheckpointState()
     chkpt.save(CP_STATE_KEY, cp)
+    val restored: ControllerProcessor = chkpt.load(CP_STATE_KEY)
+    assert(restored.actorId == cp.actorId)
   }
 
-  "Default worker state" should "be serializable" in {
+  "Default worker state" should "round-trip through CheckpointState" in {
     val dp = new DataProcessor(
       SELF,
       msg => {},
@@ -83,6 +85,8 @@ class CheckpointSpec extends AnyFlatSpecLike with 
BeforeAndAfterAll {
     )
     val chkpt = new CheckpointState()
     chkpt.save(DP_STATE_KEY, dp)
+    val restored: DataProcessor = chkpt.load(DP_STATE_KEY)
+    assert(restored.actorId == dp.actorId)
   }
 
   "CheckpointState" should "fail loudly on an unknown key" in {

Reply via email to