This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9ba490eea3 feat: add state materialization across regions (#4490)
9ba490eea3 is described below
commit 9ba490eea3f49415bc11dca2dd1e7af1780f1074
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu May 14 18:48:42 2026 -0700
feat: add state materialization across regions (#4490)
### What changes were proposed in this PR?
This PR adds state materialization as a general mechanism for passing
state across different regions.
- materialize state as a separate storage object alongside result
storage
- store one serialized state per row
- use the same cross-language format on Python and Scala/Java sides
- let downstream regions read back both data and state when needed
### Any related issues, documentation, discussions?
Closes #4489
### How was this PR tested?
- added a Python round-trip test for materialized state storage in
test_iceberg_document.py
- added a Scala round-trip test for materialized state storage in
IcebergDocumentSpec.scala
- You can manually test with this workflow: [Test
States.json](https://github.com/user-attachments/files/27779044/Test.States.json)
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)
---------
Signed-off-by: Xinyuan Lin <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
Co-authored-by: Xiaozhen Liu <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.github/workflows/build.yml | 37 +++
.../handlers/control/assign_port_handler.py | 6 +-
.../core/architecture/packaging/output_manager.py | 63 ++++-
amber/src/main/python/core/runnables/main_loop.py | 1 +
.../main/python/core/storage/document_factory.py | 107 +++++----
.../input_port_materialization_reader_runnable.py | 21 +-
.../src/main/python/core/storage/storage_config.py | 3 +
.../main/python/core/storage/vfs_uri_factory.py | 17 +-
amber/src/main/python/texera_run_python_worker.py | 2 +
.../messaginglayer/OutputManager.scala | 52 ++++-
.../pythonworker/PythonWorkflowWorker.scala | 1 +
.../scheduling/CostBasedScheduleGenerator.scala | 8 +-
.../ExpansionGreedyScheduleGenerator.scala | 4 +-
.../scheduling/RegionExecutionCoordinator.scala | 16 +-
.../scheduling/config/PortConfig.scala | 10 +-
.../InputPortMaterializationReaderThread.scala | 34 ++-
...d.scala => OutputPortStorageWriterThread.scala} | 2 +-
.../worker/promisehandlers/AssignPortHandler.scala | 4 +-
.../architecture/packaging/test_output_manager.py | 107 +++++++++
.../packaging/test_state_materialization_e2e.py | 258 +++++++++++++++++++++
.../test/python/core/runnables/test_main_loop.py | 157 +++++++++++++
.../core/storage/iceberg/test_iceberg_document.py | 108 ++++++++-
...t_input_port_materialization_reader_runnable.py | 99 ++++++++
.../python/core/storage/test_document_factory.py | 134 +++++++++++
.../pytexera/storage/test_large_binary_manager.py | 1 +
...ala => OutputPortStorageWriterThreadSpec.scala} | 18 +-
common/config/src/main/resources/storage.conf | 3 +
.../amber/config/EnvironmentalVariable.scala | 1 +
.../apache/texera/amber/config/StorageConfig.scala | 3 +
.../amber/core/storage/DocumentFactory.scala | 2 +
.../texera/amber/core/storage/VFSURIFactory.scala | 22 +-
.../amber/core/storage/VFSURIFactorySpec.scala | 23 +-
.../result/iceberg/IcebergDocumentSpec.scala | 83 ++++++-
.../result/iceberg/IcebergTableStatsSpec.scala | 22 +-
34 files changed, 1287 insertions(+), 142 deletions(-)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index b583f59725..8b0a5ea522 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -633,6 +633,43 @@ jobs:
flags: python
fail_ci_if_error: false
+ python-state-materialization-mac:
+ # Diagnostic leg: cross-region state materialization is reported to
+ # fail on macOS while working on Windows / Linux. The main `python`
+ # job above runs only on ubuntu-latest because it depends on a
+ # postgres service container (service containers don't work on
+ # macOS runners). The state-materialization integration tests use
+ # an in-process sqlite-backed SqlCatalog instead, so we can run
+ # them on macOS without postgres infra. If they fail here but pass
+ # in the main `python` job, we've reproduced the macOS-specific
+ # regression in CI.
+ if: ${{ inputs.run_python }}
+ runs-on: macos-latest
+ steps:
+ - name: Checkout Texera
+ uses: actions/checkout@v5
+ with:
+ ref: ${{ inputs.checkout_ref || github.sha }}
+ fetch-depth: 0
+ - name: Prepare backport workspace
+ if: ${{ inputs.backport_target_branch != '' }}
+ run: bash ./.github/scripts/prepare-backport-checkout.sh "${{
inputs.backport_target_branch }}" "${{ inputs.backport_commit_range }}"
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v6
+ with:
+ python-version: "3.12"
+ - name: Install dependencies
+ run: |
+ python -m pip install uv
+ if [ -f amber/requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
+ if [ -f amber/operator-requirements.txt ]; then uv pip install
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt;
fi
+ if [ -f amber/dev-requirements.txt ]; then uv pip install --system
-r amber/dev-requirements.txt; fi
+ - name: Run state-materialization integration tests
+ run: |
+ cd amber && pytest -sv \
+
src/test/python/core/architecture/packaging/test_state_materialization_e2e.py \
+
src/test/python/core/runnables/test_main_loop.py::TestMainLoop::test_process_start_channel_persists_produce_state_on_start_output
+
agent-service:
if: ${{ inputs.run_agent_service }}
name: ${{ format('agent-service{0} ({1})', inputs.job_name_suffix,
matrix.os) }}
diff --git
a/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
b/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
index 73ebad26b3..71f1e7f96b 100644
---
a/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
+++
b/amber/src/main/python/core/architecture/handlers/control/assign_port_handler.py
@@ -44,10 +44,10 @@ class AssignPortHandler(ControlHandler):
channel_id=channel_id, port_id=req.port_id
)
else:
- storage_uri = None
+ storage_uri_base = None
if len(req.storage_uris) > 0 and req.storage_uris[0]:
- storage_uri = req.storage_uris[0]
+ storage_uri_base = req.storage_uris[0]
self.context.output_manager.add_output_port(
- req.port_id, Schema(raw_schema=req.schema), storage_uri
+ req.port_id, Schema(raw_schema=req.schema), storage_uri_base
)
return EmptyReturn()
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index bf4afbf396..b85e3e39bf 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -45,6 +45,7 @@ from core.models import Tuple, Schema, StateFrame
from core.models.payload import DataPayload, DataFrame
from core.models.state import State
from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
from core.storage.runnables.port_storage_writer import (
PortStorageWriter,
PortStorageWriterElement,
@@ -87,6 +88,10 @@ class OutputManager:
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()
+ self._port_state_writers: typing.Dict[
+ PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
+ ] = dict()
+
def is_missing_output_ports(self):
"""
This method is only used for ensuring correct region execution.
@@ -107,26 +112,30 @@ class OutputManager:
self,
port_id: PortIdentity,
schema: Schema,
- storage_uri: typing.Optional[str] = None,
+ storage_uri_base: typing.Optional[str] = None,
) -> None:
if port_id.id is None:
port_id.id = 0
if port_id.internal is None:
port_id.internal = False
- if storage_uri is not None:
- self.set_up_port_storage_writer(port_id, storage_uri)
+ if storage_uri_base is not None:
+ self.set_up_port_storage_writer(port_id, storage_uri_base)
# each port can only be added and initialized once.
if port_id not in self._ports:
self._ports[port_id] = WorkerPort(schema)
- def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri:
str):
+ def set_up_port_storage_writer(self, port_id: PortIdentity,
storage_uri_base: str):
"""
Create a separate thread for saving output tuples of a port
- to storage in batch.
+ to storage in batch, and open a long-lived buffered writer for
+ state materialization on the same port. `storage_uri_base` is the
+ port's base URI; the result and state URIs are derived from it.
"""
- document, _ = DocumentFactory.open_document(storage_uri)
+ document, _ = DocumentFactory.open_document(
+ VFSURIFactory.result_uri(storage_uri_base)
+ )
buffered_item_writer =
document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
port_storage_writer = PortStorageWriter(
@@ -144,6 +153,29 @@ class OutputManager:
writer_thread,
)
+ state_document, _ = DocumentFactory.open_document(
+ VFSURIFactory.state_uri(storage_uri_base)
+ )
+ state_buffered_item_writer = state_document.writer(
+ str(get_worker_index(self.worker_id))
+ )
+ state_writer_queue = Queue()
+ state_port_writer = PortStorageWriter(
+ buffered_item_writer=state_buffered_item_writer,
+ queue=state_writer_queue,
+ )
+ state_writer_thread = threading.Thread(
+ target=state_port_writer.run,
+ daemon=True,
+ name=f"port_state_writer_thread_{port_id}",
+ )
+ state_writer_thread.start()
+ self._port_state_writers[port_id] = (
+ state_writer_queue,
+ state_port_writer,
+ state_writer_thread,
+ )
+
def get_port(self, port_id=None) -> WorkerPort:
return list(self._ports.values())[0]
@@ -171,6 +203,20 @@ class OutputManager:
PortStorageWriterElement(data_tuple=tuple_)
)
+ def save_state_to_storage_if_needed(self, state: State, port_id=None) ->
None:
+ # When port_id is omitted the same state row is fanned out to
+ # every output port's state table. This mirrors the
+ # broadcast-to-all-workers behavior on the emit side: state is
+ # shared context, not per-key data, so every downstream operator
+ # (and every worker reading the materialization) needs the full
+ # set.
+ element = PortStorageWriterElement(data_tuple=state.to_tuple())
+ if port_id is None:
+ for writer_queue, _, _ in self._port_state_writers.values():
+ writer_queue.put(element)
+ elif port_id in self._port_state_writers:
+ self._port_state_writers[port_id][0].put(element)
+
def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
@@ -184,6 +230,11 @@ class OutputManager:
for _, _, writer_thread in self._port_storage_writers.values():
# This blocking call will wait for all the writer to finish commit
writer_thread.join()
+ for _, state_writer, _ in self._port_state_writers.values():
+ state_writer.stop()
+ for _, _, state_writer_thread in self._port_state_writers.values():
+ state_writer_thread.join()
+ self._port_state_writers.clear()
def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning)
-> None:
"""
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index ab35cda81b..1334af12bf 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -202,6 +202,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
payload=batch,
)
)
+
self.context.output_manager.save_state_to_storage_if_needed(output_state)
def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""
diff --git a/amber/src/main/python/core/storage/document_factory.py
b/amber/src/main/python/core/storage/document_factory.py
index 9b686ab66b..bd690ceb59 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -61,30 +61,35 @@ class DocumentFactory:
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)
- if resource_type in {VFSResourceType.RESULT}:
- storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
- # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
- # field name encoding
- iceberg_schema = amber_schema_to_iceberg_schema(schema)
-
- create_table(
- IcebergCatalogInstance.get_instance(),
- StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
- storage_key,
- iceberg_schema,
- override_if_exists=True,
- )
-
- return IcebergDocument[Tuple](
- StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
- storage_key,
- iceberg_schema,
- amber_tuples_to_arrow_table,
- arrow_table_to_amber_tuples,
- )
- else:
- raise ValueError(f"Resource type {resource_type} is not
supported")
+ match resource_type:
+ case VFSResourceType.RESULT:
+ namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+ case VFSResourceType.STATE:
+ namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+ case _:
+ raise ValueError(f"Resource type {resource_type} is not
supported")
+
+ storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+ # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
+ # field name encoding
+ iceberg_schema = amber_schema_to_iceberg_schema(schema)
+
+ create_table(
+ IcebergCatalogInstance.get_instance(),
+ namespace,
+ storage_key,
+ iceberg_schema,
+ override_if_exists=True,
+ )
+
+ return IcebergDocument[Tuple](
+ namespace,
+ storage_key,
+ iceberg_schema,
+ amber_tuples_to_arrow_table,
+ arrow_table_to_amber_tuples,
+ )
+
else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for creating the
document"
@@ -96,30 +101,36 @@ class DocumentFactory:
if parsed_uri.scheme == "vfs":
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)
- if resource_type in {VFSResourceType.RESULT}:
- storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
- table = load_table_metadata(
- IcebergCatalogInstance.get_instance(),
- StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
- storage_key,
- )
-
- if table is None:
- raise ValueError("No storage is found for the given URI")
-
- amber_schema = Schema(table.schema().as_arrow())
-
- document = IcebergDocument(
- StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
- storage_key,
- table.schema(),
- amber_tuples_to_arrow_table,
- arrow_table_to_amber_tuples,
- )
- return document, amber_schema
- else:
- raise ValueError(f"Resource type {resource_type} is not
supported")
+ match resource_type:
+ case VFSResourceType.RESULT:
+ namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+ case VFSResourceType.STATE:
+ namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+ case _:
+ raise ValueError(f"Resource type {resource_type} is not
supported")
+
+ storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+
+ table = load_table_metadata(
+ IcebergCatalogInstance.get_instance(),
+ namespace,
+ storage_key,
+ )
+
+ if table is None:
+ raise ValueError("No storage is found for the given URI")
+
+ amber_schema = Schema(table.schema().as_arrow())
+
+ document = IcebergDocument(
+ namespace,
+ storage_key,
+ table.schema(),
+ amber_tuples_to_arrow_table,
+ arrow_table_to_amber_tuples,
+ )
+ return document, amber_schema
+
else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for opening the
document"
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index 6122bbb8b9..3e0e2d48ab 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -34,9 +34,10 @@ from
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
-from core.models import Tuple, InternalQueue, DataFrame, DataPayload
+from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State,
StateFrame
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
from core.util import Stoppable, get_one_of
from core.util.runnable import Runnable
from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
@@ -132,14 +133,28 @@ class InputPortMaterializationReaderRunnable(Runnable,
Stoppable):
emits an EndChannel ECM. Use the same partitioner implementation as
that in
output manager, where a tuple is batched by the partitioner and only
selected as the input of this worker according to the partitioner.
+
+ States and tuples are persisted to separate tables, so the original
+ interleaving is lost and replay has to pick an order: we replay states
+ first because downstream operators typically need their state set up
+ before they process the incoming tuples. Every state is broadcast to
+ every downstream worker -- no partitioner filtering, unlike the tuple
+ loop. State is shared context (e.g. config / counters), not per-key
+ data, so each worker needs the full set.
"""
try:
self.materialization, self.tuple_schema =
DocumentFactory.open_document(
- self.uri
+ VFSURIFactory.result_uri(self.uri)
)
self.emit_ecm("StartChannel",
EmbeddedControlMessageType.NO_ALIGNMENT)
- storage_iterator = self.materialization.get()
+ state_document, _ = DocumentFactory.open_document(
+ VFSURIFactory.state_uri(self.uri)
+ )
+ for state_row in state_document.get():
+ self.emit_payload(StateFrame(State.from_tuple(state_row)))
+
+ storage_iterator = self.materialization.get()
# Iterate and process tuples.
for tup in storage_iterator:
if self._stopped:
diff --git a/amber/src/main/python/core/storage/storage_config.py
b/amber/src/main/python/core/storage/storage_config.py
index 0e47bdb71a..8233590987 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -32,6 +32,7 @@ class StorageConfig:
ICEBERG_REST_CATALOG_URI = None
ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
ICEBERG_TABLE_RESULT_NAMESPACE = None
+ ICEBERG_TABLE_STATE_NAMESPACE = None
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None
@@ -51,6 +52,7 @@ class StorageConfig:
rest_catalog_uri,
rest_catalog_warehouse_name,
table_result_namespace,
+ table_state_namespace,
directory_path,
commit_batch_size,
s3_endpoint,
@@ -71,6 +73,7 @@ class StorageConfig:
cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name
cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
+ cls.ICEBERG_TABLE_STATE_NAMESPACE = table_state_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py
b/amber/src/main/python/core/storage/vfs_uri_factory.py
index de0c5db56e..883450abf2 100644
--- a/amber/src/main/python/core/storage/vfs_uri_factory.py
+++ b/amber/src/main/python/core/storage/vfs_uri_factory.py
@@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
RESULT = "result"
RUNTIME_STATISTICS = "runtimeStatistics"
CONSOLE_MESSAGES = "consoleMessages"
+ STATE = "state"
class VFSURIFactory:
@@ -88,12 +89,22 @@ class VFSURIFactory:
)
@staticmethod
- def create_result_uri(workflow_id, execution_id, global_port_id) -> str:
- """Creates a URI pointing to a result storage."""
- base_uri = (
+ def create_port_base_uri(workflow_id, execution_id, global_port_id) -> str:
+ """Base URI for a port. Result and state URIs derive from it via
+ `result_uri` / `state_uri`.
+ """
+ return (
f"{VFSURIFactory.VFS_FILE_URI_SCHEME}:///wid/{workflow_id.id}"
f"/eid/{execution_id.id}/globalportid/"
f"{serialize_global_port_identity(global_port_id)}"
)
+ @staticmethod
+ def result_uri(base_uri: str) -> str:
+ """The result-resource URI under a port base URI."""
return f"{base_uri}/{VFSResourceType.RESULT.value}"
+
+ @staticmethod
+ def state_uri(base_uri: str) -> str:
+ """The state-resource URI under a port base URI."""
+ return f"{base_uri}/{VFSResourceType.STATE.value}"
diff --git a/amber/src/main/python/texera_run_python_worker.py
b/amber/src/main/python/texera_run_python_worker.py
index 8687298f81..9b21fa5334 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -52,6 +52,7 @@ if __name__ == "__main__":
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
+ iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
@@ -68,6 +69,7 @@ if __name__ == "__main__":
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
+ iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 095315590f..030fa3a3bb 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.messaginglayer
import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.tuple._
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
@@ -33,7 +33,7 @@ import
org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.
import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._
import
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
import org.apache.texera.amber.engine.architecture.worker.managers.{
- OutputPortResultWriterThread,
+ OutputPortStorageWriterThread,
PortStorageWriterTerminateSignal
}
import org.apache.texera.amber.engine.common.AmberLogging
@@ -121,7 +121,10 @@ class OutputManager(
mutable.HashMap[(PhysicalLink, ActorVirtualIdentity),
NetworkOutputBuffer]()
private val outputPortResultWriterThreads
- : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
+ : mutable.HashMap[PortIdentity, OutputPortStorageWriterThread] =
+ mutable.HashMap()
+
+ private val stateWriterThreads: mutable.HashMap[PortIdentity,
OutputPortStorageWriterThread] =
mutable.HashMap()
/**
@@ -191,19 +194,20 @@ class OutputManager(
def emitState(state: State): Unit = {
networkOutputBuffers.foreach(kv => kv._2.sendState(state))
+ saveStateToStorageIfNeeded(state)
}
- def addPort(portId: PortIdentity, schema: Schema, storageURIOption:
Option[URI]): Unit = {
+ def addPort(portId: PortIdentity, schema: Schema, storageURIBaseOption:
Option[URI]): Unit = {
// each port can only be added and initialized once.
if (this.ports.contains(portId)) {
return
}
this.ports(portId) = WorkerPort(schema)
- // if a storage URI is provided, set up a storage writer thread
- storageURIOption match {
- case Some(storageUri) => setupOutputStorageWriterThread(portId,
storageUri)
- case None => // No need to add a writer
+ // if a storage URI base is provided, set up storage writer threads
+ storageURIBaseOption match {
+ case Some(portBaseURI) => setupOutputStorageWriterThread(portId,
portBaseURI)
+ case None => // No need to add a writer
}
}
@@ -232,6 +236,15 @@ class OutputManager(
})
}
+ private def saveStateToStorageIfNeeded(state: State): Unit = {
+ // The same state row is fanned out to every output port's state
+ // table. This mirrors the broadcast-to-all-workers behavior on the
+ // emit side: state is shared context, not per-key data, so every
+ // downstream operator (and every worker reading the materialization)
+ // needs the full set.
+ stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple)))
+ }
+
/**
* Singal the port storage writer to flush the remaining buffer and wait
for commits to finish so that
* the output port is properly completed. If the output port does not need
storage, no action will be done.
@@ -251,7 +264,11 @@ class OutputManager(
writerThread.getFailure.foreach(throw _)
case None =>
}
-
+ this.stateWriterThreads.remove(outputPortId).foreach { writerThread =>
+ writerThread.queue.put(Right(PortStorageWriterTerminateSignal))
+ writerThread.join()
+ writerThread.getFailure.foreach(throw _)
+ }
}
def getPort(portId: PortIdentity): WorkerPort = ports(portId)
@@ -285,9 +302,9 @@ class OutputManager(
ports.head._1
}
- private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri:
URI): Unit = {
+ private def setupOutputStorageWriterThread(portId: PortIdentity,
portBaseURI: URI): Unit = {
val bufferedItemWriter = DocumentFactory
- .openDocument(storageUri)
+ .openDocument(VFSURIFactory.resultURI(portBaseURI))
._1
.writer(
VirtualIdentityUtils
@@ -300,9 +317,20 @@ class OutputManager(
.toString
)
.asInstanceOf[BufferedItemWriter[Tuple]]
- val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)
+ val writerThread = new OutputPortStorageWriterThread(bufferedItemWriter)
this.outputPortResultWriterThreads(portId) = writerThread
writerThread.start()
+
+ // The state document is provisioned alongside the result document
+ // by RegionExecutionCoordinator, so it is always present.
+ val stateWriter = DocumentFactory
+ .openDocument(VFSURIFactory.stateURI(portBaseURI))
+ ._1
+ .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+ .asInstanceOf[BufferedItemWriter[Tuple]]
+ val stateWriterThread = new OutputPortStorageWriterThread(stateWriter)
+ this.stateWriterThreads(portId) = stateWriterThread
+ stateWriterThread.start()
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index 4ff5ff15ae..3358e31e65 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -187,6 +187,7 @@ class PythonWorkflowWorker(
if (isRest) StorageConfig.icebergRESTCatalogUri else "",
if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "",
StorageConfig.icebergTableResultNamespace,
+ StorageConfig.icebergTableStateNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
StorageConfig.icebergTableCommitBatchSize.toString,
StorageConfig.s3Endpoint,
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 401ccddc0a..43e8d281ce 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
PhysicalOpIdentity}
import org.apache.texera.amber.core.workflow._
import
org.apache.texera.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex
@@ -174,12 +174,12 @@ class CostBasedScheduleGenerator(
// Allocate an URI for each of these output ports
val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
outputPortIdsNeedingStorage.map { gpid =>
- val outputWriterURI = createResultURI(
+ val portBaseURI = createPortBaseURI(
workflowId = workflowContext.workflowId,
executionId = workflowContext.executionId,
globalPortId = gpid
)
- gpid -> OutputPortConfig(outputWriterURI)
+ gpid -> OutputPortConfig(portBaseURI)
}.toMap
val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs)
@@ -237,7 +237,7 @@ class CostBasedScheduleGenerator(
s"the outout port $globalOutputPortId has not been
assigned a URI yet."
)
)
- .storageURI
+ .storageURIBase
// Group all available URIs of this input port together
acc.updated(
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
index 4bb8933896..304e1496f8 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
import com.typesafe.scalalogging.LazyLogging
import org.apache.texera.amber.core.WorkflowRuntimeException
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
import org.apache.texera.amber.core.workflow.{
GlobalPortIdentity,
@@ -331,7 +331,7 @@ class ExpansionGreedyScheduleGenerator(
private def getStorageURIFromGlobalOutputPortId(outputPortId:
GlobalPortIdentity) = {
assert(!outputPortId.input)
- createResultURI(
+ createPortBaseURI(
workflowId = workflowContext.workflowId,
executionId = workflowContext.executionId,
globalPortId = outputPortId
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 2971e4c4f4..5a9df11b58 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -21,7 +21,8 @@ package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.pekko.pattern.gracefulStop
import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer,
Return, Throw, Timer}
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity,
PhysicalLink, PhysicalOp}
@@ -465,7 +466,7 @@ class RegionExecutionCoordinator(
opId = physicalOp.id,
portId = outputPortId
) =>
- cfg.storageURI.toString
+ cfg.storageURIBase.toString
}
.getOrElse("")
Some(
@@ -568,18 +569,21 @@ class RegionExecutionCoordinator(
): Unit = {
portConfigs.foreach {
case (outputPortId, portConfig) =>
- val storageUriToAdd = portConfig.storageURI
- val (_, eid, _, _) = decodeURI(storageUriToAdd)
+ val portBaseURI = portConfig.storageURIBase
+ val resultURI = VFSURIFactory.resultURI(portBaseURI)
+ val stateURI = VFSURIFactory.stateURI(portBaseURI)
val schemaOptional =
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
- DocumentFactory.createDocument(storageUriToAdd, schema)
+ DocumentFactory.createDocument(resultURI, schema)
+ DocumentFactory.createDocument(stateURI, State.schema)
if (!isRestart) {
+ val (_, eid, _, _) = decodeURI(resultURI)
WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,
- uri = storageUriToAdd
+ uri = resultURI
)
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
index b4a1e058b4..56743ae095 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/PortConfig.scala
@@ -31,9 +31,13 @@ sealed trait PortConfig {
def storageURIs: List[URI]
}
-/** An output port requires exactly one materialization URI. */
-final case class OutputPortConfig(storageURI: URI) extends PortConfig {
- override val storageURIs: List[URI] = List(storageURI)
+/**
+ * An output port requires exactly one materialization base URI. Result and
+ * state URIs hang off it via `VFSURIFactory.resultURI` / `stateURI`; this
+ * field is *not* a URI you can pass straight to `DocumentFactory`.
+ */
+final case class OutputPortConfig(storageURIBase: URI) extends PortConfig {
+ override val storageURIs: List[URI] = List(storageURIBase)
}
/**
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 10fbbc44a2..428d9fb48c 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -21,7 +21,8 @@ package
org.apache.texera.amber.engine.architecture.worker.managers
import io.grpc.MethodDescriptor
import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.{
@@ -45,7 +46,11 @@ import
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{
DPInputQueueElement,
FIFOMessageElement
}
-import org.apache.texera.amber.engine.common.ambermessage.{DataFrame,
WorkflowFIFOMessage}
+import org.apache.texera.amber.engine.common.ambermessage.{
+ DataFrame,
+ StateFrame,
+ WorkflowFIFOMessage
+}
import
org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage
import java.net.URI
@@ -78,14 +83,35 @@ class InputPortMaterializationReaderThread(
def finished: Boolean = isFinished.get()
/**
- * Read from the materialization stoage, and mimcs the behavior of an
upstream worker's output manager.
+ * Read from the materialization storage, and mimics the behavior of an
upstream worker's output manager.
+ *
+ * States and tuples are persisted to separate tables, so the original
+ * interleaving is lost and replay has to pick an order: we replay states
+ * first because downstream operators typically need their state set up
+ * before they process the incoming tuples. Every state is broadcast to
+ * every downstream worker -- no partitioner filtering, unlike the tuple
+ * loop. State is shared context (e.g. config / counters), not per-key
+ * data, so each worker needs the full set.
*/
override def run(): Unit = {
// Notify the input port of start of input channel
emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
try {
+ val stateDocument =
+ DocumentFactory
+ .openDocument(VFSURIFactory.stateURI(uri))
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+ val stateReadIterator = stateDocument.get()
+ while (stateReadIterator.hasNext) {
+ val state = State.fromTuple(stateReadIterator.next())
+ inputMessageQueue.put(
+ FIFOMessageElement(WorkflowFIFOMessage(channelId, getSequenceNumber,
StateFrame(state)))
+ )
+ }
+
val materialization: VirtualDocument[Tuple] = DocumentFactory
- .openDocument(uri)
+ .openDocument(VFSURIFactory.resultURI(uri))
._1
.asInstanceOf[VirtualDocument[Tuple]]
val storageReadIterator = materialization.get()
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThread.scala
similarity index 98%
rename from
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
rename to
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThread.scala
index 4223d920da..bcabde0089 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThread.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
sealed trait TerminateSignal
case object PortStorageWriterTerminateSignal extends TerminateSignal
-class OutputPortResultWriterThread(
+class OutputPortStorageWriterThread(
bufferedItemWriter: BufferedItemWriter[Tuple]
) extends Thread {
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
index fe959733ab..9e2c7f7c2b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala
@@ -58,11 +58,11 @@ trait AssignPortHandler {
dp.stateManager.assertState(READY, RUNNING, PAUSED)
}
} else {
- val storageURIOption: Option[URI] = msg.storageUris.head match {
+ val storageURIBaseOption: Option[URI] = msg.storageUris.head match {
case "" => None
case uriString => Some(URI.create(uriString))
}
- dp.outputManager.addPort(msg.portId, schema, storageURIOption)
+ dp.outputManager.addPort(msg.portId, schema, storageURIBaseOption)
}
EmptyReturn()
}
diff --git
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
new file mode 100644
index 0000000000..dcf7ccde67
--- /dev/null
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.packaging.output_manager import OutputManager
+from core.models.state import State
+from core.storage.runnables.port_storage_writer import PortStorageWriterElement
+from proto.org.apache.texera.amber.core import PortIdentity
+
+
+def _stub_state_writer(output_manager, port_id):
+ """Inject a (queue, writer, thread) triple as if a port were set up."""
+ queue = MagicMock()
+ writer = MagicMock()
+ thread = MagicMock()
+ output_manager._port_state_writers[port_id] = (queue, writer, thread)
+ return queue, writer, thread
+
+
+class TestSaveStateToStorageIfNeeded:
+ @pytest.fixture
+ def output_manager(self):
+ return OutputManager(worker_id="Worker:WF0-test-main-0")
+
+ @pytest.fixture
+ def port_a(self):
+ return PortIdentity(id=0, internal=False)
+
+ @pytest.fixture
+ def port_b(self):
+ return PortIdentity(id=1, internal=False)
+
+ @pytest.fixture
+ def state(self):
+ return State({"loop_counter": 1, "i": 2})
+
+ def test_no_state_writers_is_a_noop(self, output_manager, state):
+ # With no port set up, save_state_to_storage_if_needed must not
+ # touch any writer.
+ output_manager.save_state_to_storage_if_needed(state) # no-op
+
+ def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a):
+ output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+ # No assertion needed -- the absence of any writer means nothing
+ # was attempted.
+
+ def test_enqueues_to_every_port_when_port_id_omitted(
+ self, output_manager, state, port_a, port_b
+ ):
+ queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+ queue_b, _, _ = _stub_state_writer(output_manager, port_b)
+
+ output_manager.save_state_to_storage_if_needed(state)
+
+ # Each port's writer queue receives one PortStorageWriterElement.
+ # Critically, save is non-blocking -- the call must not invoke
+ # put_one / close on the buffered writer directly (those happen
+ # off-thread).
+ assert queue_a.put.call_count == 1
+ assert queue_b.put.call_count == 1
+ assert isinstance(queue_a.put.call_args.args[0],
PortStorageWriterElement)
+ assert isinstance(queue_b.put.call_args.args[0],
PortStorageWriterElement)
+
+ def test_enqueues_only_to_selected_port_when_port_id_specified(
+ self, output_manager, state, port_a, port_b
+ ):
+ queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+ queue_b, _, _ = _stub_state_writer(output_manager, port_b)
+
+ output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+
+ assert queue_a.put.call_count == 1
+ queue_b.put.assert_not_called()
+
+ def test_close_port_storage_writers_stops_state_threads(
+ self, output_manager, port_a, port_b
+ ):
+ # After the port completes, every state-writer thread must be
+ # stopped and joined so the buffered writer's close() (which
+ # flushes the final Iceberg commit) actually runs.
+ _, writer_a, thread_a = _stub_state_writer(output_manager, port_a)
+ _, writer_b, thread_b = _stub_state_writer(output_manager, port_b)
+
+ output_manager.close_port_storage_writers()
+
+ writer_a.stop.assert_called_once()
+ writer_b.stop.assert_called_once()
+ thread_a.join.assert_called_once()
+ thread_b.join.assert_called_once()
+ assert output_manager._port_state_writers == {}
diff --git
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
new file mode 100644
index 0000000000..9d1fd30698
--- /dev/null
+++
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -0,0 +1,258 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+End-to-end integration test for cross-region state materialization.
+
+The unit tests for `OutputManager.save_state_to_storage_if_needed` and
+`InputPortMaterializationReaderRunnable.run` mock out the iceberg layer,
+so a regression in the writer/storage/reader join is invisible to them.
+This test wires:
+
+ OutputManager.set_up_port_storage_writer(port, base_uri)
+ → real PortStorageWriter thread
+ → real IcebergTableWriter (sqlite-backed SqlCatalog)
+ → state document at VFSURIFactory.state_uri(base_uri)
+ → InputPortMaterializationReaderRunnable.run()
+ → DataElement(StateFrame) on the consumer's input queue
+
+and asserts that a state put through `save_state_to_storage_if_needed`
+on the producer side actually arrives at the consumer's queue, with the
+same payload.
+"""
+
+import tempfile
+import threading
+import uuid
+
+import pytest
+from pyiceberg.catalog.sql import SqlCatalog
+
+from core.architecture.packaging.output_manager import OutputManager
+from core.models import State, StateFrame
+from core.models.internal_queue import DataElement, InternalQueue
+from core.storage.document_factory import DocumentFactory
+from core.storage.iceberg.iceberg_catalog_instance import
IcebergCatalogInstance
+from core.storage.runnables.input_port_materialization_reader_runnable import (
+ InputPortMaterializationReaderRunnable,
+)
+from core.storage.storage_config import StorageConfig
+from core.storage.vfs_uri_factory import VFSURIFactory
+from proto.org.apache.texera.amber.core import (
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ ExecutionIdentity,
+ GlobalPortIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity,
+ PortIdentity,
+ WorkflowIdentity,
+)
+from proto.org.apache.texera.amber.engine.architecture.sendsemantics import (
+ OneToOnePartitioning,
+ Partitioning,
+)
+
+
+# Module-level scratch dir for the sqlite catalog + iceberg warehouse.
+# We don't initialize `StorageConfig` here: other test modules (e.g.
+# test_iceberg_document.py) also call `StorageConfig.initialize` at
+# import time, and the class rejects re-initialization with
+# RuntimeError. Whichever module gets collected first wins; we adopt
+# its namespaces below.
+_WAREHOUSE_DIR = tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-")
+
+
[email protected](scope="module", autouse=True)
+def sqlite_iceberg_catalog():
+ """Inject a sqlite-backed SqlCatalog so the test runs without external
+ iceberg infra (postgres/minio).
+
+ Module-scoped so all tests in this file share one warehouse, and so
+ namespace creation only happens once. We save/restore the original
+ `IcebergCatalogInstance` singleton so other test modules that expect
+ a real postgres-backed catalog (e.g. test_iceberg_document.py) are
+ not affected by our replacement.
+ """
+ # Some other test module may have initialized StorageConfig already
+ # (it has a single-init lock). If nothing has initialized it yet,
+ # do it here with arbitrary values -- we replace the catalog
+ # instance below so the postgres/rest fields are never exercised.
+ if not StorageConfig._initialized:
+ StorageConfig.initialize(
+ catalog_type="postgres",
+ postgres_uri_without_scheme="unused",
+ postgres_username="unused",
+ postgres_password="unused",
+ rest_catalog_uri="unused",
+ rest_catalog_warehouse_name="unused",
+ table_result_namespace="operator-port-result",
+ table_state_namespace="operator-port-state",
+ directory_path=_WAREHOUSE_DIR,
+ commit_batch_size=4096,
+ s3_endpoint="unused",
+ s3_region="unused",
+ s3_auth_username="unused",
+ s3_auth_password="unused",
+ )
+
+ original_instance = IcebergCatalogInstance._instance
+ db_path = f"{_WAREHOUSE_DIR}/catalog.sqlite"
+ catalog = SqlCatalog(
+ "texera_iceberg_e2e",
+ **{
+ "uri": f"sqlite:///{db_path}",
+ "warehouse": f"file://{_WAREHOUSE_DIR}",
+ },
+ )
+ # Adopt whatever namespaces StorageConfig already has -- those are
+ # the ones DocumentFactory will route into.
+
catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE)
+
catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE)
+ IcebergCatalogInstance.replace_instance(catalog)
+ try:
+ yield catalog
+ finally:
+ IcebergCatalogInstance.replace_instance(original_instance)
+
+
+def _fresh_base_uri() -> str:
+ """A unique port-base URI per test so tables don't collide."""
+ return VFSURIFactory.create_port_base_uri(
+ WorkflowIdentity(id=0),
+ ExecutionIdentity(id=0),
+ GlobalPortIdentity(
+ op_id=PhysicalOpIdentity(
+ logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"),
+ layer_name="main",
+ ),
+ port_id=PortIdentity(id=0, internal=False),
+ input=False,
+ ),
+ )
+
+
+def test_state_written_by_output_manager_is_replayed_by_reader():
+ """Producer side writes a state via OutputManager; consumer side reads
+ it via InputPortMaterializationReaderRunnable. The state must arrive
+ on the consumer's input queue intact.
+ """
+ base_uri = _fresh_base_uri()
+ port_id = PortIdentity(id=0, internal=False)
+ worker_schema_for_result = State.SCHEMA # producer-side: only state
matters
+
+ # 1. RegionExecutionCoordinator's responsibility: provision result +
+ # state documents at the port base URI before any worker starts.
+ # We emulate that here.
+ DocumentFactory.create_document(
+ VFSURIFactory.result_uri(base_uri), worker_schema_for_result
+ )
+ DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri),
State.SCHEMA)
+
+ # 2. Producer side: spin up an OutputManager, set up real state +
+ # result writer threads against the iceberg storage.
+ producer = OutputManager(worker_id="Worker:WF0-test-producer-main-0")
+ producer.add_output_port(
+ port_id, schema=worker_schema_for_result, storage_uri_base=base_uri
+ )
+
+ # 3. Drive a state through the producer-side path.
+ state = State({"flag": True, "loop_counter": 7, "name": "outer"})
+ producer.save_state_to_storage_if_needed(state)
+
+ # 4. Force the writer threads to flush + commit by closing them.
+ # Without this, the iceberg buffer holds the state in memory and
+ # nothing is durable yet.
+ producer.close_port_storage_writers()
+
+ # 5. Consumer side: spin up the materialization reader against the
+ # same base URI. Each reader needs a partitioning even when no real
+ # downstream worker exists -- supply a OneToOnePartitioning whose
+ # only receiver is the consumer worker itself.
+ consumer_worker = ActorVirtualIdentity(name="consumer-worker-0")
+ consumer_queue = InternalQueue()
+ partitioning = Partitioning(
+ one_to_one_partitioning=OneToOnePartitioning(
+ batch_size=400,
+ channels=[
+ ChannelIdentity(
+
from_worker_id=ActorVirtualIdentity(name="producer-worker-0"),
+ to_worker_id=consumer_worker,
+ is_control=False,
+ )
+ ],
+ )
+ )
+ reader = InputPortMaterializationReaderRunnable(
+ uri=base_uri,
+ queue=consumer_queue,
+ worker_actor_id=consumer_worker,
+ partitioning=partitioning,
+ )
+
+ # Run the reader on a worker thread so we can time out cleanly if
+ # something goes wrong.
+ reader_thread = threading.Thread(target=reader.run, daemon=True)
+ reader_thread.start()
+ reader_thread.join(timeout=30)
+ assert not reader_thread.is_alive(), "reader did not finish within timeout"
+ assert reader.finished(), "reader exited but did not mark itself finished"
+
+ # 6. Drain the consumer's queue and find the StateFrame(s).
+ state_frames: list[State] = []
+ while not consumer_queue.is_empty():
+ elem = consumer_queue.get()
+ if isinstance(elem, DataElement) and isinstance(elem.payload,
StateFrame):
+ state_frames.append(elem.payload.frame)
+
+ assert len(state_frames) == 1, (
+ f"expected exactly one State to flow through writer→iceberg→reader; "
+ f"got {len(state_frames)}: {state_frames}"
+ )
+ assert state_frames[0] == state, (
+ f"replayed state did not match what was written; "
+ f"wrote={state}, read={state_frames[0]}"
+ )
+
+
+def test_state_table_persists_across_writer_close():
+ """Independently verify the iceberg state table contains the row.
+ If this passes but the reader test above fails, the bug is in the
+ reader / consumer wiring; if this fails, the bug is in the writer /
+ storage layer.
+ """
+ base_uri = _fresh_base_uri()
+ port_id = PortIdentity(id=0, internal=False)
+
+ DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri),
State.SCHEMA)
+ DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri),
State.SCHEMA)
+
+ producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0")
+ producer.add_output_port(port_id, schema=State.SCHEMA,
storage_uri_base=base_uri)
+
+ state = State({"flag": False, "checkpoint": 42})
+ producer.save_state_to_storage_if_needed(state)
+ producer.close_port_storage_writers()
+
+ # Read directly from the iceberg state document, bypassing the reader.
+ state_document, _ =
DocumentFactory.open_document(VFSURIFactory.state_uri(base_uri))
+ rows = list(state_document.get())
+ assert len(rows) == 1, (
+ f"expected exactly one row in the iceberg state table after the "
+ f"writer was closed; got {len(rows)} rows"
+ )
+ assert State.from_tuple(rows[0]) == state
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py
b/amber/src/test/python/core/runnables/test_main_loop.py
index 400a7f2a90..c32f45b888 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -1393,6 +1393,163 @@ class TestMainLoop:
assert second_output.payload.frame["value"] == 42
assert second_output.payload.frame["port"] == 0
+ @pytest.mark.timeout(2)
+ def test_process_input_state_persists_output_state_to_storage(
+ self,
+ main_loop,
+ mock_data_output_channel,
+ monkeypatch,
+ ):
+ # process_input_state must invoke save_state_to_storage_if_needed
+ # with the freshly emitted output state, so every state that flows
+ # downstream is also durable on the upstream output port.
+ class DummyExecutor:
+ @staticmethod
+ def process_state(state: State, port: int) -> State:
+ return State({"value": state["value"] + 1, "port": port})
+
+ saved_states: list[State] = []
+ main_loop.context.executor_manager.executor = DummyExecutor()
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ )
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "save_state_to_storage_if_needed",
+ lambda state: saved_states.append(state),
+ )
+
+ def fake_switch_context():
+ current_input_state = (
+ main_loop.context.state_processing_manager.current_input_state
+ )
+ if current_input_state is not None:
+
main_loop.context.state_processing_manager.current_output_state = (
+ DummyExecutor.process_state(current_input_state, 0)
+ )
+
+ monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+ main_loop._process_state(State({"value": 1}))
+ main_loop._process_state(State({"value": 41}))
+
+ # Each input state produced one output state, so both must have
+ # been persisted in order.
+ assert [s["value"] for s in saved_states] == [2, 42]
+ assert all(s["port"] == 0 for s in saved_states)
+
+ @pytest.mark.timeout(2)
+ def test_process_start_channel_persists_produce_state_on_start_output(
+ self,
+ main_loop,
+ mock_data_output_channel,
+ monkeypatch,
+ ):
+ # The state emitted by an executor's `produce_state_on_start` must
+ # also be persisted via `save_state_to_storage_if_needed`, so a
+ # downstream worker in a different region can replay it from the
+ # iceberg state table.
+ #
+ # This is the integration path exercised in real workflows when
+ # users override `produce_state_on_start`. `_process_start_channel`
+ # → `process_input_state` → DataProcessor.process_internal_marker
+ # (StartChannel) → executor.produce_state_on_start → _set_output_state
+ # → MainLoop reads output state → emit + save.
+ on_start_state = State({"flag": True, "loop_counter": 0})
+
+ class DummyExecutor:
+ @staticmethod
+ def produce_state_on_start(port: int) -> State:
+ # Tag with port so we can also assert the right port id
+ # was forwarded.
+ return State({**on_start_state, "port": port})
+
+ saved_states: list[State] = []
+ main_loop.context.executor_manager.executor = DummyExecutor()
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ )
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "save_state_to_storage_if_needed",
+ lambda state: saved_states.append(state),
+ )
+ # _send_ecm_to_data_channels touches output_manager state we don't
+ # set up here; for this test the ECM forwarding is irrelevant -- the
+ # SAVE path is what we're pinning. Stub it.
+ monkeypatch.setattr(main_loop, "_send_ecm_to_data_channels", lambda
*_: None)
+
+ # Simulate the DP-thread side: when MainLoop yields, the DataProcessor
+ # consumes the StartChannel marker and runs produce_state_on_start.
+ def fake_switch_context():
+ from core.models.internal_marker import StartChannel as
_StartChannel
+
+ tpm = main_loop.context.tuple_processing_manager
+ if isinstance(tpm.current_internal_marker, _StartChannel):
+ # mimic DataProcessor.process_internal_marker(StartChannel)
+ produced = DummyExecutor.produce_state_on_start(port=0)
+
main_loop.context.state_processing_manager.current_output_state = (
+ produced
+ )
+ tpm.current_internal_marker = None # consumed
+
+ monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+ # Drive the path: this is exactly what `_process_ecm` calls when a
+ # StartChannel ECM arrives and the start_channel handler has set
+ # the marker.
+ from core.models.internal_marker import StartChannel
+
+ main_loop.context.tuple_processing_manager.current_internal_marker = (
+ StartChannel()
+ )
+ main_loop._process_start_channel()
+
+ # The state produced by produce_state_on_start must be persisted to
+ # iceberg via save_state_to_storage_if_needed. Without this, a
+ # downstream worker in a different region cannot observe the state.
+ assert len(saved_states) == 1, (
+ f"produce_state_on_start emitted a state but it was not persisted "
+ f"to storage. saved_states={saved_states}"
+ )
+ assert saved_states[0]["flag"] is True
+ assert saved_states[0]["loop_counter"] == 0
+ assert saved_states[0]["port"] == 0
+
+ @pytest.mark.timeout(2)
+ def test_process_input_state_does_not_save_when_no_output(
+ self,
+ main_loop,
+ monkeypatch,
+ ):
+ # When the executor returns no output state (process_state returned
+ # None), save_state_to_storage_if_needed must not be called -- no
+ # state means nothing to materialize.
+ save_calls: list[State] = []
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state: [],
+ )
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "save_state_to_storage_if_needed",
+ lambda state: save_calls.append(state),
+ )
+ # Pretend DataProc consumed the input but produced no output.
+ monkeypatch.setattr(main_loop, "_switch_context", lambda: None)
+
+ main_loop._process_state(State({"value": 1}))
+
+ assert save_calls == []
+
@pytest.mark.timeout(2)
def test_main_loop_thread_can_process_state(
self,
diff --git
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index 327b907306..bdba00ba0c 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -24,6 +24,7 @@ from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from core.models import Schema, Tuple
+from core.models.state import State
from core.storage.document_factory import DocumentFactory
from core.storage.storage_config import StorageConfig
from core.storage.vfs_uri_factory import VFSURIFactory
@@ -49,6 +50,7 @@ StorageConfig.initialize(
rest_catalog_uri="http://localhost:8181/catalog/",
rest_catalog_warehouse_name="texera",
table_result_namespace="operator-port-result",
+ table_state_namespace="operator-port-state",
directory_path=tempfile.mkdtemp(prefix="texera-iceberg-warehouse-"),
commit_batch_size=4096,
s3_endpoint="http://localhost:9000",
@@ -81,17 +83,21 @@ class TestIcebergDocument:
with a random operator id
"""
operator_uuid = str(uuid.uuid4()).replace("-", "")
- uri = VFSURIFactory.create_result_uri(
- WorkflowIdentity(id=0),
- ExecutionIdentity(id=0),
- GlobalPortIdentity(
- op_id=PhysicalOpIdentity(
-
logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"),
- layer_name="main",
+ uri = VFSURIFactory.result_uri(
+ VFSURIFactory.create_port_base_uri(
+ WorkflowIdentity(id=0),
+ ExecutionIdentity(id=0),
+ GlobalPortIdentity(
+ op_id=PhysicalOpIdentity(
+ logical_op_id=OperatorIdentity(
+ id=f"test-table-{operator_uuid}"
+ ),
+ layer_name="main",
+ ),
+ port_id=PortIdentity(id=0),
+ input=False,
),
- port_id=PortIdentity(id=0),
- input=False,
- ),
+ )
)
DocumentFactory.create_document(uri, amber_schema)
document, _ = DocumentFactory.open_document(uri)
@@ -322,3 +328,85 @@ class TestIcebergDocument:
assert iceberg_document.get_count() == len(sample_items), (
"get_count should return the same number as the length of
sample_items"
)
+
+ def test_state_materialization_round_trip(self):
+ operator_uuid = str(uuid.uuid4()).replace("-", "")
+ base_uri = VFSURIFactory.create_port_base_uri(
+ WorkflowIdentity(id=0),
+ ExecutionIdentity(id=0),
+ GlobalPortIdentity(
+ op_id=PhysicalOpIdentity(
+
logical_op_id=OperatorIdentity(id=f"test-state-{operator_uuid}"),
+ layer_name="main",
+ ),
+ port_id=PortIdentity(id=0),
+ input=False,
+ ),
+ )
+ state_uri = VFSURIFactory.state_uri(base_uri)
+ DocumentFactory.create_document(state_uri, State.SCHEMA)
+ document, _ = DocumentFactory.open_document(state_uri)
+
+ state = State(
+ {
+ "loop_counter": 3,
+ "name": "outer-loop",
+ "payload": b"\x00\x01state-bytes",
+ "nested": {"enabled": True, "values": [1, 2, 3]},
+ }
+ )
+
+ writer = document.writer(str(uuid.uuid4()))
+ writer.open()
+ writer.put_one(state.to_tuple())
+ writer.close()
+
+ stored_rows = list(document.get())
+ assert len(stored_rows) == 1
+ assert State.from_tuple(stored_rows[0]) == state
+
+ def test_multiple_states_materialize_as_rows_in_one_table(self):
+ operator_uuid = str(uuid.uuid4()).replace("-", "")
+ base_uri = VFSURIFactory.create_port_base_uri(
+ WorkflowIdentity(id=0),
+ ExecutionIdentity(id=0),
+ GlobalPortIdentity(
+ op_id=PhysicalOpIdentity(
+ logical_op_id=OperatorIdentity(
+ id=f"test-multiple-states-{operator_uuid}"
+ ),
+ layer_name="main",
+ ),
+ port_id=PortIdentity(id=0),
+ input=False,
+ ),
+ )
+ state_uri = VFSURIFactory.state_uri(base_uri)
+ DocumentFactory.create_document(state_uri, State.SCHEMA)
+ document, _ = DocumentFactory.open_document(state_uri)
+
+ states = [
+ State({"loop_counter": 0, "i": 1, "payload": b"first"}),
+ State(
+ {
+ "loop_counter": 1,
+ "i": 2,
+ "payload": b"second",
+ "nested": {"values": [3, 4]},
+ }
+ ),
+ ]
+
+ writer = document.writer(str(uuid.uuid4()))
+ writer.open()
+ for state in states:
+ writer.put_one(state.to_tuple())
+ writer.close()
+
+ stored_rows = list(document.get())
+ assert len(stored_rows) == len(states)
+ actual_states = sorted(
+ [State.from_tuple(row) for row in stored_rows],
+ key=lambda state: state["loop_counter"],
+ )
+ assert actual_states == states
diff --git
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
new file mode 100644
index 0000000000..5016c2df2f
--- /dev/null
+++
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from core.models import State, StateFrame
+from core.models.internal_queue import DataElement
+from core.models.schema import Schema
+from core.storage.runnables.input_port_materialization_reader_runnable import (
+ InputPortMaterializationReaderRunnable,
+)
+from proto.org.apache.texera.amber.core import (
+ ActorVirtualIdentity,
+ ChannelIdentity,
+)
+
+
+class TestRunStateReadingBlock:
+ """Cover the state-reading block in run() that opens the state
+ document and emits its rows as StateFrames directly to the input
+ queue (no partitioner filtering -- state is broadcast to every
+ worker).
+ """
+
+ @pytest.fixture
+ def me(self):
+ return ActorVirtualIdentity(name="me")
+
+ @pytest.fixture
+ def runnable(self, me):
+ instance = InputPortMaterializationReaderRunnable.__new__(
+ InputPortMaterializationReaderRunnable
+ )
+ instance.uri = "vfs:///wf/0/exec/0/result/op-a"
+ instance.worker_actor_id = me
+ instance.tuple_schema = Schema(raw_schema={"x": "INTEGER"})
+ instance._stopped = False
+ instance._finished = False
+ instance.channel_id = ChannelIdentity(me, me, is_control=False)
+ instance.queue = MagicMock()
+ instance.partitioner = MagicMock()
+ # No tuple-batches and no ECM-flush payloads in these tests.
+ instance.partitioner.flush.return_value = []
+ return instance
+
+ def test_state_rows_are_emitted_as_state_frames(self, runnable):
+ state_a = State({"loop_counter": 0})
+ state_b = State({"loop_counter": 1})
+
+ # The state document yields opaque tuples; from_tuple deserializes
+ # them. Patch from_tuple so we don't have to wire a real
+ # serialization.
+ result_doc = MagicMock()
+ result_doc.get.return_value = iter([]) # No materialized tuples.
+ state_doc = MagicMock()
+ state_doc.get.return_value = iter(["row-a", "row-b"])
+
+ with (
+ patch(
+
"core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory"
+ ) as mock_factory,
+ patch.object(State, "from_tuple") as mock_from_tuple,
+ ):
+ mock_factory.open_document.side_effect = [
+ (result_doc, runnable.tuple_schema),
+ (state_doc, None),
+ ]
+ mock_from_tuple.side_effect = [state_a, state_b]
+
+ runnable.run()
+
+ # Two StateFrames must have been put on the queue, in order.
+ # The state replay must NOT route through the partitioner --
+ # state is shared context, broadcast to every worker.
+ runnable.partitioner.flush_state.assert_not_called()
+ state_frames = [
+ call.args[0]
+ for call in runnable.queue.put.call_args_list
+ if isinstance(call.args[0], DataElement)
+ and isinstance(call.args[0].payload, StateFrame)
+ ]
+ assert [sf.payload.frame for sf in state_frames] == [state_a, state_b]
+ assert runnable._finished is True
diff --git a/amber/src/test/python/core/storage/test_document_factory.py
b/amber/src/test/python/core/storage/test_document_factory.py
new file mode 100644
index 0000000000..859c004024
--- /dev/null
+++ b/amber/src/test/python/core/storage/test_document_factory.py
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from core.models import Schema
+from core.storage.document_factory import DocumentFactory
+from core.storage.storage_config import StorageConfig
+from core.storage.vfs_uri_factory import VFSResourceType
+
+
+# Avoid initializing the real config (only initializable once per process).
+StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE = "test-result-ns"
+StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE = "test-state-ns"
+
+VFS_URI = "vfs:///wid/0/eid/0/opid/test/main/0/0/result"
+
+
[email protected]
+def schema():
+ return Schema(raw_schema={"x": "INTEGER"})
+
+
+def _decode_returning(resource_type):
+ """Helper: build a VFSURIFactory.decode_uri side_effect."""
+ return lambda _uri: (None, None, None, resource_type)
+
+
+@patch("core.storage.document_factory.IcebergDocument")
+@patch("core.storage.document_factory.amber_schema_to_iceberg_schema")
+@patch("core.storage.document_factory.create_table")
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestCreateDocumentNamespaceRouting:
+ def test_state_resource_type_uses_state_namespace(
+ self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema
+ ):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect =
_decode_returning(VFSResourceType.STATE)
+
+ DocumentFactory.create_document(VFS_URI, schema)
+
+ args, _ = mock_create_table.call_args
+ assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+
+ def test_result_resource_type_uses_result_namespace(
+ self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema
+ ):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect =
_decode_returning(VFSResourceType.RESULT)
+
+ DocumentFactory.create_document(VFS_URI, schema)
+
+ args, _ = mock_create_table.call_args
+ assert args[1] == StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+
+ def test_unsupported_resource_type_raises_value_error(
+ self, mock_vfs, _icb, _create_table, _amber_schema, _doc, schema
+ ):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ # CONSOLE_MESSAGES has no namespace mapping in the Python factory.
+ mock_vfs.decode_uri.side_effect = _decode_returning(
+ VFSResourceType.CONSOLE_MESSAGES
+ )
+
+ with pytest.raises(ValueError, match="not supported"):
+ DocumentFactory.create_document(VFS_URI, schema)
+
+
+def test_create_document_rejects_non_vfs_scheme(schema):
+ with pytest.raises(NotImplementedError, match="Unsupported URI scheme"):
+ DocumentFactory.create_document("file:///tmp/x", schema)
+
+
+@patch("core.storage.document_factory.IcebergDocument")
+@patch("core.storage.document_factory.Schema")
+@patch("core.storage.document_factory.load_table_metadata")
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestOpenDocumentNamespaceRouting:
+ @staticmethod
+ def _stub_table():
+ table = MagicMock()
+ table.schema.return_value.as_arrow.return_value = MagicMock()
+ return table
+
+ def test_state_resource_type_uses_state_namespace(
+ self, mock_vfs, _icb, mock_load, _schema_cls, _doc
+ ):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect =
_decode_returning(VFSResourceType.STATE)
+ mock_load.return_value = self._stub_table()
+
+ DocumentFactory.open_document(VFS_URI)
+
+ args, _ = mock_load.call_args
+ assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+
+ def test_unsupported_resource_type_raises_value_error(
+ self, mock_vfs, _icb, _load, _schema_cls, _doc
+ ):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect = _decode_returning(
+ VFSResourceType.CONSOLE_MESSAGES
+ )
+
+ with pytest.raises(ValueError, match="not supported"):
+ DocumentFactory.open_document(VFS_URI)
+
+ def test_missing_table_raises_value_error(
+ self, mock_vfs, _icb, mock_load, _schema_cls, _doc
+ ):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect =
_decode_returning(VFSResourceType.STATE)
+ mock_load.return_value = None
+
+ with pytest.raises(ValueError, match="No storage is found"):
+ DocumentFactory.open_document(VFS_URI)
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
index 64c7080e52..1942e91f8b 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
@@ -34,6 +34,7 @@ class TestLargeBinaryManager:
rest_catalog_uri="http://localhost:8181/catalog/",
rest_catalog_warehouse_name="texera",
table_result_namespace="test",
+ table_state_namespace="test-state",
directory_path="/tmp/test",
commit_batch_size=1000,
s3_endpoint="http://localhost:9000",
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThreadSpec.scala
similarity index 90%
rename from
amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
rename to
amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThreadSpec.scala
index 31d8c41611..d7ab0c1831 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortStorageWriterThreadSpec.scala
@@ -32,7 +32,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import scala.collection.mutable
-class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
+class OutputPortStorageWriterThreadSpec extends AnyFlatSpec {
private class StubWriter(
onPutOne: () => Unit = () => (),
@@ -51,9 +51,9 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
private def throwing(msg: String): () => Unit = () => throw new
RuntimeException(msg)
- "OutputPortResultWriterThread" should "leave getFailure empty on a clean
run" in {
+ "OutputPortStorageWriterThread" should "leave getFailure empty on a clean
run" in {
val writer = new StubWriter()
- val thread = new OutputPortResultWriterThread(writer)
+ val thread = new OutputPortStorageWriterThread(writer)
thread.start()
thread.queue.put(Right(PortStorageWriterTerminateSignal))
thread.join()
@@ -63,7 +63,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
it should "capture a close() exception in getFailure so the worker can
re-throw" in {
val writer = new StubWriter(onClose = throwing("test close failure"))
- val thread = new OutputPortResultWriterThread(writer)
+ val thread = new OutputPortStorageWriterThread(writer)
thread.start()
thread.queue.put(Right(PortStorageWriterTerminateSignal))
thread.join()
@@ -73,7 +73,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
it should "capture a putOne exception and still call close()" in {
val writer = new StubWriter(onPutOne = throwing("test putOne failure"))
- val thread = new OutputPortResultWriterThread(writer)
+ val thread = new OutputPortStorageWriterThread(writer)
thread.start()
thread.queue.put(Left(null.asInstanceOf[Tuple]))
thread.queue.put(Right(PortStorageWriterTerminateSignal))
@@ -89,7 +89,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
onPutOne = throwing("test putOne failure"),
onClose = throwing("test close failure")
)
- val thread = new OutputPortResultWriterThread(writer)
+ val thread = new OutputPortStorageWriterThread(writer)
thread.start()
thread.queue.put(Left(null.asInstanceOf[Tuple]))
thread.queue.put(Right(PortStorageWriterTerminateSignal))
@@ -110,14 +110,14 @@ class OutputPortResultWriterThreadSpec extends
AnyFlatSpec {
private def installWriterThread(
manager: OutputManager,
portId: PortIdentity,
- thread: OutputPortResultWriterThread
+ thread: OutputPortStorageWriterThread
): Unit = {
val field = classOf[OutputManager]
.getDeclaredField("outputPortResultWriterThreads")
field.setAccessible(true)
field
.get(manager)
- .asInstanceOf[mutable.HashMap[PortIdentity,
OutputPortResultWriterThread]]
+ .asInstanceOf[mutable.HashMap[PortIdentity,
OutputPortStorageWriterThread]]
.put(portId, thread)
}
@@ -130,7 +130,7 @@ class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
)
val portId = PortIdentity()
val failingWriter = new StubWriter(onClose = throwing("test close
failure"))
- val failingThread = new OutputPortResultWriterThread(failingWriter)
+ val failingThread = new OutputPortStorageWriterThread(failingWriter)
failingThread.start()
installWriterThread(outputManager, portId, failingThread)
val ex = intercept[RuntimeException] {
diff --git a/common/config/src/main/resources/storage.conf
b/common/config/src/main/resources/storage.conf
index 29d5f7be51..0682109f19 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -58,6 +58,9 @@ storage {
runtime-statistics-namespace = "workflow-runtime-statistics"
runtime-statistics-namespace =
${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE}
+ state-namespace = "operator-port-state"
+ state-namespace = ${?STORAGE_ICEBERG_TABLE_STATE_NAMESPACE}
+
commit {
batch-size = 4096 # decide the buffer size of our
IcebergTableWriter
batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE}
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
index 9ec52bba65..123c56505e 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
@@ -67,6 +67,7 @@ object EnvironmentalVariable {
"STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE"
val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE =
"STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE"
+ val ENV_ICEBERG_TABLE_STATE_NAMESPACE =
"STORAGE_ICEBERG_TABLE_STATE_NAMESPACE"
val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE =
"STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE"
val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES =
"STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES"
val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS =
"STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS"
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 728e3c0c2d..07447cfdbe 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -54,6 +54,8 @@ object StorageConfig {
conf.getString("storage.iceberg.table.console-messages-namespace")
val icebergTableRuntimeStatisticsNamespace: String =
conf.getString("storage.iceberg.table.runtime-statistics-namespace")
+ val icebergTableStateNamespace: String =
+ conf.getString("storage.iceberg.table.state-namespace")
val icebergTableCommitBatchSize: Int =
conf.getInt("storage.iceberg.table.commit.batch-size")
val icebergTableCommitNumRetries: Int =
@@ -111,6 +113,7 @@ object StorageConfig {
"STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE"
val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE =
"STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE"
+ val ENV_ICEBERG_TABLE_STATE_NAMESPACE =
"STORAGE_ICEBERG_TABLE_STATE_NAMESPACE"
val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE =
"STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE"
val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES =
"STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES"
val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS =
"STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS"
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 15949ef471..00f6c70ba7 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -72,6 +72,7 @@ object DocumentFactory {
case RESULT => StorageConfig.icebergTableResultNamespace
case CONSOLE_MESSAGES =>
StorageConfig.icebergTableConsoleMessagesNamespace
case RUNTIME_STATISTICS =>
StorageConfig.icebergTableRuntimeStatisticsNamespace
+ case STATE => StorageConfig.icebergTableStateNamespace
case _ =>
throw new IllegalArgumentException(s"Resource type $resourceType
is not supported")
}
@@ -119,6 +120,7 @@ object DocumentFactory {
case RESULT => StorageConfig.icebergTableResultNamespace
case CONSOLE_MESSAGES =>
StorageConfig.icebergTableConsoleMessagesNamespace
case RUNTIME_STATISTICS =>
StorageConfig.icebergTableRuntimeStatisticsNamespace
+ case STATE => StorageConfig.icebergTableStateNamespace
case _ =>
throw new IllegalArgumentException(s"Resource type $resourceType
is not supported")
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index 0fbee64457..291c31896b 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration {
val RESULT: Value = Value("result")
val RUNTIME_STATISTICS: Value = Value("runtimeStatistics")
val CONSOLE_MESSAGES: Value = Value("consoleMessages")
+ val STATE: Value = Value("state")
}
object VFSURIFactory {
@@ -83,18 +84,25 @@ object VFSURIFactory {
}
/**
- * Create a URI pointing to a result storage
+ * Create the base URI for a port. Result and state URIs are derived
+ * from this base via `resultURI` / `stateURI`.
*/
- def createResultURI(
+ def createPortBaseURI(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity,
globalPortId: GlobalPortIdentity
- ): URI = {
- val baseUri =
-
s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}/globalportid/${globalPortId.serializeAsString}"
+ ): URI =
+ new URI(
+ s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}" +
+ s"/globalportid/${globalPortId.serializeAsString}"
+ )
- new URI(s"$baseUri/${VFSResourceType.RESULT.toString.toLowerCase}")
- }
+ def resultURI(baseURI: URI): URI = appendResource(baseURI,
VFSResourceType.RESULT)
+
+ def stateURI(baseURI: URI): URI = appendResource(baseURI,
VFSResourceType.STATE)
+
+ private def appendResource(baseURI: URI, resourceType:
VFSResourceType.Value): URI =
+ new URI(s"$baseURI/${resourceType.toString.toLowerCase}")
/**
* Create a URI pointing to runtime statistics
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
index 6fbe35873a..0b8ae4a19c 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
@@ -42,23 +42,30 @@ class VFSURIFactorySpec extends AnyFlatSpec {
input = true
)
- "VFSURIFactory.createResultURI" should "include workflow, execution, port,
and the result resource type" in {
- val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
- assert(uri.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
- val path = uri.getPath
+ "VFSURIFactory.createPortBaseURI" should "include workflow, execution, and
port segments without a resource type" in {
+ val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId,
portId)
+ assert(baseURI.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
+ val path = baseURI.getPath
assert(path.contains("/wid/7"))
assert(path.contains("/eid/11"))
assert(path.contains("/globalportid/"))
- assert(path.endsWith("/result"))
+ assert(!path.endsWith("/result"))
+ assert(!path.endsWith("/state"))
}
- it should "round-trip through decodeURI" in {
- val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
- val (wid, eid, globalPortIdOpt, resourceType) =
VFSURIFactory.decodeURI(uri)
+ "VFSURIFactory.resultURI / stateURI" should "append the resource segment and
round-trip through decodeURI" in {
+ val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId,
portId)
+ val resultURI = VFSURIFactory.resultURI(baseURI)
+ val stateURI = VFSURIFactory.stateURI(baseURI)
+ assert(resultURI.getPath.endsWith("/result"))
+ assert(stateURI.getPath.endsWith("/state"))
+
+ val (wid, eid, globalPortIdOpt, resourceType) =
VFSURIFactory.decodeURI(resultURI)
assert(wid == workflowId)
assert(eid == executionId)
assert(globalPortIdOpt.contains(portId))
assert(resourceType == VFSResourceType.RESULT)
+ assert(VFSURIFactory.decodeURI(stateURI)._4 == VFSResourceType.STATE)
}
"VFSURIFactory.createRuntimeStatisticsURI" should "produce a
runtimeStatistics URI without an opid segment" in {
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index eb259fed58..b92562eeb7 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -20,6 +20,7 @@
package org.apache.texera.amber.storage.result.iceberg
import org.apache.texera.amber.config.StorageConfig
+import org.apache.texera.amber.core.state.State
import org.apache.texera.amber.core.storage.model.{VirtualDocument,
VirtualDocumentSpec}
import org.apache.texera.amber.core.storage.{DocumentFactory,
IcebergCatalogInstance, VFSURIFactory}
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
@@ -51,6 +52,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple]
with BeforeAndAfter
var deserde: (IcebergSchema, Record) => Tuple = _
var catalog: Catalog = _
val tableNamespace = "test_namespace"
+ var baseURI: URI = _
var uri: URI = _
override def beforeAll(): Unit = {
@@ -79,7 +81,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple]
with BeforeAndAfter
override def beforeEach(): Unit = {
// Generate a unique table name for each test
- uri = VFSURIFactory.createResultURI(
+ baseURI = VFSURIFactory.createPortBaseURI(
WorkflowIdentity(0),
ExecutionIdentity(0),
GlobalPortIdentity(
@@ -91,6 +93,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple]
with BeforeAndAfter
PortIdentity()
)
)
+ uri = VFSURIFactory.resultURI(baseURI)
DocumentFactory.createDocument(uri, amberSchema)
super.beforeEach()
}
@@ -141,6 +144,84 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
}
}
+ it should "round trip materialized state documents" in {
+ val stateUri = VFSURIFactory.stateURI(baseURI)
+ DocumentFactory.createDocument(stateUri, State.schema)
+ val stateDocument =
+
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ val state = State(
+ Map(
+ "loop_counter" -> 3,
+ "name" -> "outer-loop",
+ "payload" -> Array[Byte](0, 1, 2, 3),
+ "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3))
+ )
+ )
+
+ val writer = stateDocument.writer(UUID.randomUUID().toString)
+ writer.open()
+ writer.putOne(state.toTuple)
+ writer.close()
+
+ val storedRows = stateDocument.get().toList
+ assert(storedRows.length == 1)
+ val deserialized = State.fromTuple(storedRows.head).values
+ assert(deserialized("loop_counter") == 3L)
+ assert(deserialized("name") == "outer-loop")
+
assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0,
1, 2, 3)))
+ assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") ==
true)
+ assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") ==
List(1L, 2L, 3L))
+ }
+
+ it should "materialize multiple states as rows in one state table" in {
+ val stateUri = VFSURIFactory.stateURI(baseURI)
+ DocumentFactory.createDocument(stateUri, State.schema)
+ val stateDocument =
+
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ val states: List[State] = List(
+ State(Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2,
3))),
+ State(
+ Map(
+ "loop_counter" -> 1,
+ "i" -> 2,
+ "payload" -> Array[Byte](4, 5, 6),
+ "nested" -> Map("values" -> List(3, 4))
+ )
+ )
+ )
+
+ val writer = stateDocument.writer(UUID.randomUUID().toString)
+ writer.open()
+ states.foreach(state => writer.putOne(state.toTuple))
+ writer.close()
+
+ val deserializedStates =
+ stateDocument
+ .get()
+ .toList
+ .map(State.fromTuple)
+ .sortBy(_.values("loop_counter").asInstanceOf[Long])
+ assert(deserializedStates.length == states.length)
+ deserializedStates.zip(states).foreach {
+ case (actual, expected) =>
+ assert(
+ actual.values("loop_counter") ==
expected.values("loop_counter").asInstanceOf[Int].toLong
+ )
+ assert(actual.values("i") ==
expected.values("i").asInstanceOf[Int].toLong)
+ assert(
+ actual
+ .values("payload")
+ .asInstanceOf[Array[Byte]]
+ .sameElements(expected.values("payload").asInstanceOf[Array[Byte]])
+ )
+ }
+ assert(
+ deserializedStates(1)
+ .values("nested")
+ .asInstanceOf[Map[String, Any]]("values") == List(3L, 4L)
+ )
+ }
+
/** Returns a dynamic proxy for `realTable` that increments `counter` on
every `refresh()` call. */
private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger):
Table =
Proxy
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
index b7cf776eb8..956e9f42b9 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
@@ -50,16 +50,18 @@ class IcebergTableStatsSpec extends AnyFlatSpec with
BeforeAndAfterAll with Suit
var deserde: (IcebergSchema, Record) => Tuple = _
var catalog: Catalog = _
val tableNamespace = "test_namespace"
- var uri: URI = VFSURIFactory.createResultURI(
- WorkflowIdentity(0),
- ExecutionIdentity(0),
- GlobalPortIdentity(
- PhysicalOpIdentity(
- logicalOpId =
-
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
- layerName = "main"
- ),
- PortIdentity()
+ var uri: URI = VFSURIFactory.resultURI(
+ VFSURIFactory.createPortBaseURI(
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ GlobalPortIdentity(
+ PhysicalOpIdentity(
+ logicalOpId =
+
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
+ layerName = "main"
+ ),
+ PortIdentity()
+ )
)
)