This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-materialization
by this push:
new 8e27025f6a docs: explain state-before-tuple replay order in
materialization readers
8e27025f6a is described below
commit 8e27025f6a93b72a99aa582d42ca906868349686
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue May 5 22:15:56 2026 -0700
docs: explain state-before-tuple replay order in materialization readers
Address PR #4490 review comment 3192875005: document why the
input-port materialization reader replays states before tuples
(downstream operators typically need their state in place before
processing the incoming tuples).
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
.../storage/runnables/input_port_materialization_reader_runnable.py | 5 +++++
.../worker/managers/InputPortMaterializationReaderThread.scala | 4 ++++
2 files changed, 9 insertions(+)
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index af847621ef..67c6370c00 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -148,6 +148,11 @@ class InputPortMaterializationReaderRunnable(Runnable,
Stoppable):
)
self.emit_ecm("StartChannel",
EmbeddedControlMessageType.NO_ALIGNMENT)
+ # States and tuples are persisted to separate tables, so
+ # the original interleaving is lost and replay has to pick
+ # an order: we replay states first because downstream
+ # operators typically need their state set up before they
+ # process the incoming tuples.
state_document, _ = DocumentFactory.open_document(
State.uri_from_result_uri(self.uri)
)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 7439430ca2..586d6ba207 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -89,6 +89,10 @@ class InputPortMaterializationReaderThread(
// Notify the input port of start of input channel
emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
try {
+ // States and tuples are persisted to separate tables, so the
+ // original interleaving is lost and replay has to pick an order:
+ // we replay states first because downstream operators typically
+ // need their state set up before they process the incoming tuples.
val stateDocument =
DocumentFactory
.openDocument(State.uriFromResultUri(uri))