This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-materialization 
by this push:
     new df9e360d65 docs: note state replay broadcasts to every worker
df9e360d65 is described below

commit df9e360d6562d14baa516fa86f631ad1ffc358b6
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue May 5 23:08:32 2026 -0700

    docs: note state replay broadcasts to every worker
    
    Address PR #4490 review comment 3192889029: explain why the state
    loop intentionally enqueues every row to every downstream worker
    while the tuple loop filters by partitioner -- state is shared
    context, not per-key data.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../worker/managers/InputPortMaterializationReaderThread.scala        | 4 ++++
 1 file changed, 4 insertions(+)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 586d6ba207..98c68731ae 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -99,6 +99,10 @@ class InputPortMaterializationReaderThread(
           ._1
           .asInstanceOf[VirtualDocument[Tuple]]
       val stateReadIterator = stateDocument.get()
+      // Every state is broadcast to every downstream worker -- no
+      // partitioner filtering here, unlike the tuple loop below. State
+      // is shared context (e.g. config / counters), not per-key data,
+      // so each worker needs the full set.
       while (stateReadIterator.hasNext) {
         val state = State.fromTuple(stateReadIterator.next())
         inputMessageQueue.put(

Reply via email to