This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch xinyuan-state-materialization in repository https://gitbox.apache.org/repos/asf/texera.git
commit 581d574ca694160a2b29fdbb62969e63a332bc09 Author: Xinyuan Lin <[email protected]> AuthorDate: Mon May 4 23:00:03 2026 -0700 feat(amber): materialize per-port state to Iceberg storage Adds a state-materialization path alongside the existing tuple-result storage. State produced by an operator's processState is written to a companion Iceberg table whose URI is derived from the result URI. The input-port materialization reader replays both tuples and states into downstream workers. Key pieces: - New STATE resource type and a state-namespace storage config entry on both Python and Scala sides; namespaces are read from StorageConfig instead of hardcoded strings. - RegionExecutionCoordinator provisions a state document next to every result document at scheduling time, so readers and writers can rely on its presence without try/catch. - One long-lived BufferedItemWriter per output port, opened at port setup and closed at port completion, so a single Iceberg snapshot is produced per port instead of one per state. - DataProcessor.processInputState (Scala) and MainLoop.process_input_state (Python) persist the executor's *output* state, matching the state that is also emitted downstream. - New Python and Scala unit tests covering the State JSON wire format, the OutputManager state-writer lifecycle, the reader's state-replay block, and DocumentFactory namespace routing. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- .../core/architecture/packaging/output_manager.py | 30 +++- .../architecture/packaging/test_output_manager.py | 127 ++++++++++++++ amber/src/main/python/core/models/state.py | 4 + amber/src/main/python/core/runnables/main_loop.py | 1 + .../main/python/core/runnables/test_main_loop.py | 76 +++++++++ .../main/python/core/storage/document_factory.py | 107 ++++++------ .../core/storage/iceberg/test_iceberg_document.py | 84 +++++++++ .../input_port_materialization_reader_runnable.py | 28 ++- ...t_input_port_materialization_reader_runnable.py | 190 +++++++++++++++++++++ .../src/main/python/core/storage/storage_config.py | 3 + .../python/core/storage/test_document_factory.py | 134 +++++++++++++++ .../main/python/core/storage/vfs_uri_factory.py | 1 + .../pytexera/storage/test_large_binary_manager.py | 1 + amber/src/main/python/texera_run_python_worker.py | 2 + .../messaginglayer/OutputManager.scala | 19 ++- .../pythonworker/PythonWorkflowWorker.scala | 1 + .../scheduling/RegionExecutionCoordinator.scala | 3 + .../engine/architecture/worker/DataProcessor.scala | 1 + .../InputPortMaterializationReaderThread.scala | 26 ++- common/config/src/main/resources/storage.conf | 3 + .../amber/config/EnvironmentalVariable.scala | 1 + .../apache/texera/amber/config/StorageConfig.scala | 3 + .../org/apache/texera/amber/core/state/State.scala | 4 + .../amber/core/storage/DocumentFactory.scala | 2 + .../texera/amber/core/storage/VFSURIFactory.scala | 1 + .../result/iceberg/IcebergDocumentSpec.scala | 79 +++++++++ 26 files changed, 877 insertions(+), 54 deletions(-) diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index bf4afbf396..08fa210eca 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -87,6 +87,8 @@ class OutputManager: PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread] ] = dict() + self._state_writers: typing.Dict[PortIdentity, typing.Any] = dict() + def is_missing_output_ports(self): """ This method is only used for ensuring correct region execution. @@ -124,7 +126,8 @@ class OutputManager: def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str): """ Create a separate thread for saving output tuples of a port - to storage in batch. + to storage in batch, and open a long-lived buffered writer for + state materialization on the same port. """ document, _ = DocumentFactory.open_document(storage_uri) buffered_item_writer = document.writer(str(get_worker_index(self.worker_id))) @@ -144,6 +147,13 @@ class OutputManager: writer_thread, ) + state_document, _ = DocumentFactory.open_document( + State.uri_from_result_uri(storage_uri) + ) + state_writer = state_document.writer(str(get_worker_index(self.worker_id))) + state_writer.open() + self._state_writers[port_id] = state_writer + def get_port(self, port_id=None) -> WorkerPort: return list(self._ports.values())[0] @@ -171,6 +181,19 @@ class OutputManager: PortStorageWriterElement(data_tuple=tuple_) ) + def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: + # Buffer the state on each long-lived writer; the writer flushes + # itself when its buffer fills, and the remaining buffer is + # flushed in close_port_storage_writers. + if port_id is None: + writers = self._state_writers.values() + elif port_id in self._state_writers: + writers = [self._state_writers[port_id]] + else: + return + for writer in writers: + writer.put_one(state.to_tuple()) + def close_port_storage_writers(self) -> None: """ Flush the buffers of port storage writers and wait for all the @@ -184,6 +207,11 @@ class OutputManager: for _, _, writer_thread in self._port_storage_writers.values(): # This blocking call will wait for all the writer to finish commit writer_thread.join() + # Close the long-lived state writers so the remaining buffered + # states are committed in a single Iceberg snapshot per port. + for state_writer in self._state_writers.values(): + state_writer.close() + self._state_writers.clear() def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None: """ diff --git a/amber/src/main/python/core/architecture/packaging/test_output_manager.py b/amber/src/main/python/core/architecture/packaging/test_output_manager.py new file mode 100644 index 0000000000..8f1daf8052 --- /dev/null +++ b/amber/src/main/python/core/architecture/packaging/test_output_manager.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest.mock import MagicMock, patch + +import pytest + +from core.architecture.packaging.output_manager import OutputManager +from core.models.state import State +from proto.org.apache.texera.amber.core import PortIdentity + + +class TestSaveStateToStorageIfNeeded: + @pytest.fixture + def output_manager(self): + return OutputManager(worker_id="Worker:WF0-test-main-0") + + @pytest.fixture + def port_a(self): + return PortIdentity(id=0, internal=False) + + @pytest.fixture + def port_b(self): + return PortIdentity(id=1, internal=False) + + @pytest.fixture + def state(self): + return State({"loop_counter": 1, "i": 2}) + + def test_no_state_writers_is_a_noop(self, output_manager, state): + # With no port set up, save_state_to_storage_if_needed must not + # touch any writer. + output_manager.save_state_to_storage_if_needed(state) # no-op, no exception + + def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a): + output_manager.save_state_to_storage_if_needed(state, port_id=port_a) + # No assertion needed -- the absence of any writer means nothing + # was attempted. + + def test_writes_to_every_port_when_port_id_omitted( + self, output_manager, state, port_a, port_b + ): + writer_a = MagicMock() + writer_b = MagicMock() + output_manager._state_writers[port_a] = writer_a + output_manager._state_writers[port_b] = writer_b + + output_manager.save_state_to_storage_if_needed(state) + + writer_a.put_one.assert_called_once() + writer_b.put_one.assert_called_once() + # Long-lived writers must NOT be closed per state -- otherwise + # we'd be back to one Iceberg snapshot per state. + writer_a.close.assert_not_called() + writer_b.close.assert_not_called() + + def test_writes_only_to_selected_port_when_port_id_specified( + self, output_manager, state, port_a, port_b + ): + writer_a = MagicMock() + writer_b = MagicMock() + output_manager._state_writers[port_a] = writer_a + output_manager._state_writers[port_b] = writer_b + + output_manager.save_state_to_storage_if_needed(state, port_id=port_a) + + writer_a.put_one.assert_called_once() + writer_b.put_one.assert_not_called() + + def test_state_writer_is_opened_at_port_setup(self, output_manager, port_a): + # set_up_port_storage_writer should open the result document AND + # the state document, then cache the state writer for reuse. + result_doc = MagicMock() + state_doc = MagicMock() + state_writer = MagicMock() + state_doc.writer.return_value = state_writer + + with patch( + "core.architecture.packaging.output_manager.DocumentFactory" + ) as mock_factory: + mock_factory.open_document.side_effect = [ + (result_doc, MagicMock()), + (state_doc, MagicMock()), + ] + + output_manager.set_up_port_storage_writer( + port_a, "vfs:///wf/0/exec/0/result/op-a" + ) + + opened = [c.args[0] for c in mock_factory.open_document.call_args_list] + assert opened == [ + "vfs:///wf/0/exec/0/result/op-a", + "vfs:///wf/0/exec/0/state/op-a", + ] + state_writer.open.assert_called_once() + assert output_manager._state_writers[port_a] is state_writer + + def test_close_port_storage_writers_flushes_state_writers( + self, output_manager, port_a, port_b + ): + # After the port completes, the long-lived state writer's buffer + # must be flushed and the writer closed (one Iceberg commit per + # port instead of one per state). + writer_a = MagicMock() + writer_b = MagicMock() + output_manager._state_writers[port_a] = writer_a + output_manager._state_writers[port_b] = writer_b + + output_manager.close_port_storage_writers() + + writer_a.close.assert_called_once() + writer_b.close.assert_called_once() + assert output_manager._state_writers == {} diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 003aaa212a..3ce610bbee 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -41,6 +41,10 @@ class State(dict): def from_tuple(cls, row: Tuple) -> "State": return cls.from_json(row[cls.CONTENT]) + @staticmethod + def uri_from_result_uri(result_uri: str) -> str: + return result_uri.replace("/result", "/state") + _TYPE_MARKER = "__texera_type__" _PAYLOAD_MARKER = "payload" diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index ab35cda81b..1334af12bf 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -202,6 +202,7 @@ class MainLoop(StoppableQueueBlockingRunnable): payload=batch, ) ) + self.context.output_manager.save_state_to_storage_if_needed(output_state) def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]: """ diff --git a/amber/src/main/python/core/runnables/test_main_loop.py b/amber/src/main/python/core/runnables/test_main_loop.py index c9daa633f5..534493f0c2 100644 --- a/amber/src/main/python/core/runnables/test_main_loop.py +++ b/amber/src/main/python/core/runnables/test_main_loop.py @@ -1388,6 +1388,82 @@ class TestMainLoop: assert second_output.payload.frame["value"] == 42 assert second_output.payload.frame["port"] == 0 + @pytest.mark.timeout(2) + def test_process_input_state_persists_output_state_to_storage( + self, + main_loop, + mock_data_output_channel, + monkeypatch, + ): + # process_input_state must invoke save_state_to_storage_if_needed + # with the freshly emitted output state, so every state that flows + # downstream is also durable on the upstream output port. + class DummyExecutor: + @staticmethod + def process_state(state: State, port: int) -> State: + return State({"value": state["value"] + 1, "port": port}) + + saved_states: list[State] = [] + main_loop.context.executor_manager.executor = DummyExecutor() + monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: None) + monkeypatch.setattr( + main_loop.context.output_manager, + "emit_state", + lambda state: [(mock_data_output_channel.to_worker_id, StateFrame(state))], + ) + monkeypatch.setattr( + main_loop.context.output_manager, + "save_state_to_storage_if_needed", + lambda state: saved_states.append(state), + ) + + def fake_switch_context(): + current_input_state = ( + main_loop.context.state_processing_manager.current_input_state + ) + if current_input_state is not None: + main_loop.context.state_processing_manager.current_output_state = ( + DummyExecutor.process_state(current_input_state, 0) + ) + + monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context) + + main_loop._process_state(State({"value": 1})) + main_loop._process_state(State({"value": 41})) + + # Each input state produced one output state, so both must have + # been persisted in order. + assert [s["value"] for s in saved_states] == [2, 42] + assert all(s["port"] == 0 for s in saved_states) + + @pytest.mark.timeout(2) + def test_process_input_state_does_not_save_when_no_output( + self, + main_loop, + monkeypatch, + ): + # When the executor returns no output state (process_state returned + # None), save_state_to_storage_if_needed must not be called -- no + # state means nothing to materialize. + save_calls: list[State] = [] + monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: None) + monkeypatch.setattr( + main_loop.context.output_manager, + "emit_state", + lambda state: [], + ) + monkeypatch.setattr( + main_loop.context.output_manager, + "save_state_to_storage_if_needed", + lambda state: save_calls.append(state), + ) + # Pretend DataProc consumed the input but produced no output. + monkeypatch.setattr(main_loop, "_switch_context", lambda: None) + + main_loop._process_state(State({"value": 1})) + + assert save_calls == [] + @pytest.mark.timeout(2) def test_main_loop_thread_can_process_state( self, diff --git a/amber/src/main/python/core/storage/document_factory.py b/amber/src/main/python/core/storage/document_factory.py index 9b686ab66b..bd690ceb59 100644 --- a/amber/src/main/python/core/storage/document_factory.py +++ b/amber/src/main/python/core/storage/document_factory.py @@ -61,30 +61,35 @@ class DocumentFactory: if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME: _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - # Convert Amber Schema to Iceberg Schema with LARGE_BINARY - # field name encoding - iceberg_schema = amber_schema_to_iceberg_schema(schema) - - create_table( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - override_if_exists=True, - ) - - return IcebergDocument[Tuple]( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + # Convert Amber Schema to Iceberg Schema with LARGE_BINARY + # field name encoding + iceberg_schema = amber_schema_to_iceberg_schema(schema) + + create_table( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + iceberg_schema, + override_if_exists=True, + ) + + return IcebergDocument[Tuple]( + namespace, + storage_key, + iceberg_schema, + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document" @@ -96,30 +101,36 @@ class DocumentFactory: if parsed_uri.scheme == "vfs": _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - table = load_table_metadata( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - ) - - if table is None: - raise ValueError("No storage is found for the given URI") - - amber_schema = Schema(table.schema().as_arrow()) - - document = IcebergDocument( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - table.schema(), - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - return document, amber_schema - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + + table = load_table_metadata( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + ) + + if table is None: + raise ValueError("No storage is found for the given URI") + + amber_schema = Schema(table.schema().as_arrow()) + + document = IcebergDocument( + namespace, + storage_key, + table.schema(), + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + return document, amber_schema + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document" diff --git a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py index 9b374f7d5c..032376ae31 100644 --- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py @@ -23,6 +23,7 @@ from concurrent.futures import as_completed from concurrent.futures.thread import ThreadPoolExecutor from core.models import Schema, Tuple +from core.models.state import State from core.storage.document_factory import DocumentFactory from core.storage.storage_config import StorageConfig from core.storage.vfs_uri_factory import VFSURIFactory @@ -44,6 +45,7 @@ StorageConfig.initialize( rest_catalog_uri="http://localhost:8181/catalog/", rest_catalog_warehouse_name="texera", table_result_namespace="operator-port-result", + table_state_namespace="operator-port-state", directory_path="../../../../../../amber/user-resources/workflow-results", commit_batch_size=4096, s3_endpoint="http://localhost:9000", @@ -317,3 +319,85 @@ class TestIcebergDocument: assert iceberg_document.get_count() == len(sample_items), ( "get_count should return the same number as the length of sample_items" ) + + def test_state_materialization_round_trip(self): + operator_uuid = str(uuid.uuid4()).replace("-", "") + result_uri = VFSURIFactory.create_result_uri( + WorkflowIdentity(id=0), + ExecutionIdentity(id=0), + GlobalPortIdentity( + op_id=PhysicalOpIdentity( + logical_op_id=OperatorIdentity(id=f"test_state_{operator_uuid}"), + layer_name="main", + ), + port_id=PortIdentity(id=0), + input=False, + ), + ) + state_uri = State.uri_from_result_uri(result_uri) + DocumentFactory.create_document(state_uri, State.SCHEMA) + document, _ = DocumentFactory.open_document(state_uri) + + state = State( + { + "loop_counter": 3, + "name": "outer-loop", + "payload": b"\x00\x01state-bytes", + "nested": {"enabled": True, "values": [1, 2, 3]}, + } + ) + + writer = document.writer(str(uuid.uuid4())) + writer.open() + writer.put_one(state.to_tuple()) + writer.close() + + stored_rows = list(document.get()) + assert len(stored_rows) == 1 + assert State.from_tuple(stored_rows[0]) == state + + def test_multiple_states_materialize_as_rows_in_one_table(self): + operator_uuid = str(uuid.uuid4()).replace("-", "") + result_uri = VFSURIFactory.create_result_uri( + WorkflowIdentity(id=0), + ExecutionIdentity(id=0), + GlobalPortIdentity( + op_id=PhysicalOpIdentity( + logical_op_id=OperatorIdentity( + id=f"test_multiple_states_{operator_uuid}" + ), + layer_name="main", + ), + port_id=PortIdentity(id=0), + input=False, + ), + ) + state_uri = State.uri_from_result_uri(result_uri) + DocumentFactory.create_document(state_uri, State.SCHEMA) + document, _ = DocumentFactory.open_document(state_uri) + + states = [ + State({"loop_counter": 0, "i": 1, "payload": b"first"}), + State( + { + "loop_counter": 1, + "i": 2, + "payload": b"second", + "nested": {"values": [3, 4]}, + } + ), + ] + + writer = document.writer(str(uuid.uuid4())) + writer.open() + for state in states: + writer.put_one(state.to_tuple()) + writer.close() + + stored_rows = list(document.get()) + assert len(stored_rows) == len(states) + actual_states = sorted( + [State.from_tuple(row) for row in stored_rows], + key=lambda state: state["loop_counter"], + ) + assert actual_states == states diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index e49c0316cc..bc2f069157 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -17,8 +17,8 @@ import typing from loguru import logger -from pyarrow import Table from typing import Union +from pyarrow import Table from core.architecture.sendsemantics.broad_cast_partitioner import ( BroadcastPartitioner, @@ -34,7 +34,7 @@ from core.architecture.sendsemantics.range_based_shuffle_partitioner import ( from core.architecture.sendsemantics.round_robin_partitioner import ( RoundRobinPartitioner, ) -from core.models import Tuple, InternalQueue, DataFrame, DataPayload +from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame from core.models.internal_queue import DataElement, ECMElement from core.storage.document_factory import DocumentFactory from core.util import Stoppable, get_one_of @@ -125,6 +125,15 @@ class InputPortMaterializationReaderRunnable(Runnable, Stoppable): if receiver == self.worker_actor_id: yield self.tuples_to_data_frame(tuples) + def emit_state_with_filter(self, state: State) -> typing.Iterator[DataPayload]: + for receiver, payload in self.partitioner.flush_state(state): + if receiver == self.worker_actor_id: + yield ( + StateFrame(payload) + if isinstance(payload, State) + else self.tuples_to_data_frame(payload) + ) + def run(self) -> None: """ Main execution logic that reads tuples from the materialized storage and @@ -138,8 +147,21 @@ class InputPortMaterializationReaderRunnable(Runnable, Stoppable): self.uri ) self.emit_ecm("StartChannel", EmbeddedControlMessageType.NO_ALIGNMENT) - storage_iterator = self.materialization.get() + try: + state_document, _ = DocumentFactory.open_document( + State.uri_from_result_uri(self.uri) + ) + state_iterator = state_document.get() + for state in state_iterator: + for state_frame in self.emit_state_with_filter( + State.from_tuple(state) + ): + self.emit_payload(state_frame) + except ValueError: + pass + + storage_iterator = self.materialization.get() # Iterate and process tuples. for tup in storage_iterator: if self._stopped: diff --git a/amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py new file mode 100644 index 0000000000..3662d023f5 --- /dev/null +++ b/amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py @@ -0,0 +1,190 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest.mock import MagicMock, patch + +import pytest + +from core.models import State, StateFrame, Tuple +from core.models.internal_queue import DataElement +from core.models.schema import Schema +from core.storage.runnables.input_port_materialization_reader_runnable import ( + InputPortMaterializationReaderRunnable, +) +from proto.org.apache.texera.amber.core import ( + ActorVirtualIdentity, + ChannelIdentity, +) + + +class TestEmitStateWithFilter: + """Cover the partitioner-filter logic for state payloads in + InputPortMaterializationReaderRunnable. These tests bypass __init__ + so we don't need a real partitioner or storage URI. + """ + + @pytest.fixture + def me(self): + return ActorVirtualIdentity(name="me") + + @pytest.fixture + def someone_else(self): + return ActorVirtualIdentity(name="other") + + @pytest.fixture + def runnable(self, me): + # __new__ skips __init__ so we can wire only the fields we need. + instance = InputPortMaterializationReaderRunnable.__new__( + InputPortMaterializationReaderRunnable + ) + instance.worker_actor_id = me + instance.partitioner = MagicMock() + instance.tuple_schema = Schema(raw_schema={"x": "INTEGER"}) + return instance + + def test_yields_state_frame_for_matching_receiver(self, runnable, me): + state = State({"k": 1}) + runnable.partitioner.flush_state.return_value = [(me, state)] + + frames = list(runnable.emit_state_with_filter(state)) + + assert len(frames) == 1 + assert isinstance(frames[0], StateFrame) + assert frames[0].frame is state + + def test_filters_out_non_matching_receivers(self, runnable, me, someone_else): + state = State({"k": 1}) + runnable.partitioner.flush_state.return_value = [ + (someone_else, state), + (me, state), + (someone_else, state), + ] + + frames = list(runnable.emit_state_with_filter(state)) + + assert len(frames) == 1 + assert isinstance(frames[0], StateFrame) + + def test_yields_data_frame_for_non_state_payload(self, runnable, me): + # When the partitioner produces a tuple-batch payload (BroadcastPartitioner + # case), the runnable must convert it to a DataFrame instead of wrapping + # it as a StateFrame. + state = State({"k": 1}) + tuples = [Tuple({"x": 7}, schema=runnable.tuple_schema)] + runnable.partitioner.flush_state.return_value = [(me, tuples)] + + frames = list(runnable.emit_state_with_filter(state)) + + assert len(frames) == 1 + # Should not be wrapped as a StateFrame. + assert not isinstance(frames[0], StateFrame) + assert frames[0].frame.num_rows == 1 + + def test_empty_partitioner_output_yields_nothing(self, runnable): + state = State({}) + runnable.partitioner.flush_state.return_value = [] + + assert list(runnable.emit_state_with_filter(state)) == [] + + +class TestRunStateReadingBlock: + """Cover the inner try-block in run() that opens the state document and + emits its rows as StateFrames. + """ + + @pytest.fixture + def me(self): + return ActorVirtualIdentity(name="me") + + @pytest.fixture + def runnable(self, me): + instance = InputPortMaterializationReaderRunnable.__new__( + InputPortMaterializationReaderRunnable + ) + instance.uri = "vfs:///wf/0/exec/0/result/op-a" + instance.worker_actor_id = me + instance.tuple_schema = Schema(raw_schema={"x": "INTEGER"}) + instance._stopped = False + instance._finished = False + instance.channel_id = ChannelIdentity(me, me, is_control=False) + instance.queue = MagicMock() + instance.partitioner = MagicMock() + # No tuple-batches and no ECM-flush payloads in these tests. + instance.partitioner.flush.return_value = [] + return instance + + def test_state_rows_are_emitted_as_state_frames(self, runnable, me): + state_a = State({"loop_counter": 0}) + state_b = State({"loop_counter": 1}) + + # The state document yields opaque tuples; from_tuple deserializes + # them. Patch from_tuple so we don't have to wire a real serialization. + result_doc = MagicMock() + result_doc.get.return_value = iter([]) # No materialized tuples. + state_doc = MagicMock() + state_doc.get.return_value = iter(["row-a", "row-b"]) + + with ( + patch( + "core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory" + ) as mock_factory, + patch.object(State, "from_tuple") as mock_from_tuple, + ): + mock_factory.open_document.side_effect = [ + (result_doc, runnable.tuple_schema), + (state_doc, None), + ] + mock_from_tuple.side_effect = [state_a, state_b] + runnable.partitioner.flush_state.side_effect = [ + [(me, state_a)], + [(me, state_b)], + ] + + runnable.run() + + # Two StateFrames must have been put on the queue, in order. + state_frames = [ + call.args[0] + for call in runnable.queue.put.call_args_list + if isinstance(call.args[0], DataElement) + and isinstance(call.args[0].payload, StateFrame) + ] + assert [sf.payload.frame for sf in state_frames] == [state_a, state_b] + assert runnable._finished is True + + def test_missing_state_document_does_not_abort_run(self, runnable): + # The inner try is meant to swallow ValueError when no state document + # is provisioned; the outer run() should still finish cleanly. + result_doc = MagicMock() + result_doc.get.return_value = iter([]) + + with patch( + "core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory" + ) as mock_factory: + mock_factory.open_document.side_effect = [ + (result_doc, runnable.tuple_schema), + ValueError("no storage"), + ] + + runnable.run() + + assert runnable._finished is True + # No StateFrames should have been emitted. + for call in runnable.queue.put.call_args_list: + element = call.args[0] + if isinstance(element, DataElement): + assert not isinstance(element.payload, StateFrame) diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index 0e47bdb71a..8233590987 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -32,6 +32,7 @@ class StorageConfig: ICEBERG_REST_CATALOG_URI = None ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None ICEBERG_TABLE_RESULT_NAMESPACE = None + ICEBERG_TABLE_STATE_NAMESPACE = None ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None ICEBERG_TABLE_COMMIT_BATCH_SIZE = None @@ -51,6 +52,7 @@ class StorageConfig: rest_catalog_uri, rest_catalog_warehouse_name, table_result_namespace, + table_state_namespace, directory_path, commit_batch_size, s3_endpoint, @@ -71,6 +73,7 @@ class StorageConfig: cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace + cls.ICEBERG_TABLE_STATE_NAMESPACE = table_state_namespace cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size) diff --git a/amber/src/main/python/core/storage/test_document_factory.py b/amber/src/main/python/core/storage/test_document_factory.py new file mode 100644 index 0000000000..859c004024 --- /dev/null +++ b/amber/src/main/python/core/storage/test_document_factory.py @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest.mock import MagicMock, patch + +import pytest + +from core.models import Schema +from core.storage.document_factory import DocumentFactory +from core.storage.storage_config import StorageConfig +from core.storage.vfs_uri_factory import VFSResourceType + + +# Avoid initializing the real config (only initializable once per process). +StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE = "test-result-ns" +StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE = "test-state-ns" + +VFS_URI = "vfs:///wid/0/eid/0/opid/test/main/0/0/result" + + [email protected] +def schema(): + return Schema(raw_schema={"x": "INTEGER"}) + + +def _decode_returning(resource_type): + """Helper: build a VFSURIFactory.decode_uri side_effect.""" + return lambda _uri: (None, None, None, resource_type) + + +@patch("core.storage.document_factory.IcebergDocument") +@patch("core.storage.document_factory.amber_schema_to_iceberg_schema") +@patch("core.storage.document_factory.create_table") +@patch("core.storage.document_factory.IcebergCatalogInstance") +@patch("core.storage.document_factory.VFSURIFactory") +class TestCreateDocumentNamespaceRouting: + def test_state_resource_type_uses_state_namespace( + self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema + ): + mock_vfs.VFS_FILE_URI_SCHEME = "vfs" + mock_vfs.decode_uri.side_effect = _decode_returning(VFSResourceType.STATE) + + DocumentFactory.create_document(VFS_URI, schema) + + args, _ = mock_create_table.call_args + assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE + + def test_result_resource_type_uses_result_namespace( + self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema + ): + mock_vfs.VFS_FILE_URI_SCHEME = "vfs" + mock_vfs.decode_uri.side_effect = _decode_returning(VFSResourceType.RESULT) + + DocumentFactory.create_document(VFS_URI, schema) + + args, _ = mock_create_table.call_args + assert args[1] == StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + + def test_unsupported_resource_type_raises_value_error( + self, mock_vfs, _icb, _create_table, _amber_schema, _doc, schema + ): + mock_vfs.VFS_FILE_URI_SCHEME = "vfs" + # CONSOLE_MESSAGES has no namespace mapping in the Python factory. + mock_vfs.decode_uri.side_effect = _decode_returning( + VFSResourceType.CONSOLE_MESSAGES + ) + + with pytest.raises(ValueError, match="not supported"): + DocumentFactory.create_document(VFS_URI, schema) + + +def test_create_document_rejects_non_vfs_scheme(schema): + with pytest.raises(NotImplementedError, match="Unsupported URI scheme"): + DocumentFactory.create_document("file:///tmp/x", schema) + + +@patch("core.storage.document_factory.IcebergDocument") +@patch("core.storage.document_factory.Schema") +@patch("core.storage.document_factory.load_table_metadata") +@patch("core.storage.document_factory.IcebergCatalogInstance") +@patch("core.storage.document_factory.VFSURIFactory") +class TestOpenDocumentNamespaceRouting: + @staticmethod + def _stub_table(): + table = MagicMock() + table.schema.return_value.as_arrow.return_value = MagicMock() + return table + + def test_state_resource_type_uses_state_namespace( + self, mock_vfs, _icb, mock_load, _schema_cls, _doc + ): + mock_vfs.VFS_FILE_URI_SCHEME = "vfs" + mock_vfs.decode_uri.side_effect = _decode_returning(VFSResourceType.STATE) + mock_load.return_value = self._stub_table() + + DocumentFactory.open_document(VFS_URI) + + args, _ = mock_load.call_args + assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE + + def test_unsupported_resource_type_raises_value_error( + self, mock_vfs, _icb, _load, _schema_cls, _doc + ): + mock_vfs.VFS_FILE_URI_SCHEME = "vfs" + mock_vfs.decode_uri.side_effect = _decode_returning( + VFSResourceType.CONSOLE_MESSAGES + ) + + with pytest.raises(ValueError, match="not supported"): + DocumentFactory.open_document(VFS_URI) + + def test_missing_table_raises_value_error( + self, mock_vfs, _icb, mock_load, _schema_cls, _doc + ): + mock_vfs.VFS_FILE_URI_SCHEME = "vfs" + mock_vfs.decode_uri.side_effect = _decode_returning(VFSResourceType.STATE) + mock_load.return_value = None + + with pytest.raises(ValueError, match="No storage is found"): + DocumentFactory.open_document(VFS_URI) diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py b/amber/src/main/python/core/storage/vfs_uri_factory.py index de0c5db56e..0e23e60705 100644 --- a/amber/src/main/python/core/storage/vfs_uri_factory.py +++ b/amber/src/main/python/core/storage/vfs_uri_factory.py @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum): RESULT = "result" RUNTIME_STATISTICS = "runtimeStatistics" CONSOLE_MESSAGES = "consoleMessages" + STATE = "state" class VFSURIFactory: diff --git a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py index 64c7080e52..1942e91f8b 100644 --- a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py @@ -34,6 +34,7 @@ class TestLargeBinaryManager: rest_catalog_uri="http://localhost:8181/catalog/", rest_catalog_warehouse_name="texera", table_result_namespace="test", + table_state_namespace="test-state", directory_path="/tmp/test", commit_batch_size=1000, s3_endpoint="http://localhost:9000", diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index 8687298f81..9b21fa5334 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -52,6 +52,7 @@ if __name__ == "__main__": iceberg_rest_catalog_uri, iceberg_rest_catalog_warehouse_name, iceberg_table_namespace, + iceberg_table_state_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, s3_endpoint, @@ -68,6 +69,7 @@ if __name__ == "__main__": iceberg_rest_catalog_uri, iceberg_rest_catalog_warehouse_name, iceberg_table_namespace, + iceberg_table_state_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, s3_endpoint, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056..80f22ace79 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -124,6 +124,9 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + private val stateWriters: mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] = + mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -232,6 +235,10 @@ class OutputManager( }) } + def saveStateToStorageIfNeeded(state: State): Unit = { + stateWriters.values.foreach(_.putOne(state.toTuple)) + } + /** * Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that * the output port is properly completed. If the output port does not need storage, no action will be done. @@ -245,7 +252,7 @@ class OutputManager( writerThread.join() case None => } - + this.stateWriters.remove(outputPortId).foreach(_.close()) } def getPort(portId: PortIdentity): WorkerPort = ports(portId) @@ -288,6 +295,16 @@ class OutputManager( val writerThread = new OutputPortResultWriterThread(bufferedItemWriter) this.outputPortResultWriterThreads(portId) = writerThread writerThread.start() + + // The state document is provisioned alongside the result document + // by RegionExecutionCoordinator, so it is always present. + val stateWriter = DocumentFactory + .openDocument(State.uriFromResultUri(storageUri)) + ._1 + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + stateWriter.open() + this.stateWriters(portId) = stateWriter } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index 4ff5ff15ae..3358e31e65 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -187,6 +187,7 @@ class PythonWorkflowWorker( if (isRest) StorageConfig.icebergRESTCatalogUri else "", if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "", StorageConfig.icebergTableResultNamespace, + StorageConfig.icebergTableStateNamespace, StorageConfig.fileStorageDirectoryPath.toString, StorageConfig.icebergTableCommitBatchSize.toString, StorageConfig.s3Endpoint, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 254c16bf34..58fdf9f242 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling import org.apache.pekko.pattern.gracefulStop import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, Return, Throw, Timer} +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity @@ -569,12 +570,14 @@ class RegionExecutionCoordinator( portConfigs.foreach { case (outputPortId, portConfig) => val storageUriToAdd = portConfig.storageURI + val stateUriToAdd = State.uriFromResultUri(storageUriToAdd) val (_, eid, _, _) = decodeURI(storageUriToAdd) val schemaOptional = region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) DocumentFactory.createDocument(storageUriToAdd, schema) + DocumentFactory.createDocument(stateUriToAdd, State.schema) if (!isRestart) { WorkflowExecutionsResource.insertOperatorPortResultUri( eid = eid, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 84f1e8ec65..b6c0c39aaf 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -126,6 +126,7 @@ class DataProcessor( val outputState = executor.processState(state, port) if (outputState.isDefined) { outputManager.emitState(outputState.get) + outputManager.saveStateToStorageIfNeeded(outputState.get) } } catch safely { case e => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2..90de86e1fd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.worker.managers import io.grpc.MethodDescriptor import org.apache.texera.amber.config.ApplicationConfig +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple @@ -45,7 +46,11 @@ import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ DPInputQueueElement, FIFOMessageElement } -import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.ambermessage.{ + DataFrame, + StateFrame, + WorkflowFIFOMessage +} import org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage import java.net.URI @@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread( } // Flush any remaining tuples in the buffer. if (buffer.nonEmpty) flush() + + try { + val state_document = + DocumentFactory + .openDocument(State.uriFromResultUri(uri)) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] + val stateReadIterator = state_document.get() + + while (stateReadIterator.hasNext) { + val state = State.fromTuple(stateReadIterator.next()) + inputMessageQueue.put( + FIFOMessageElement(WorkflowFIFOMessage(channelId, getSequenceNumber, StateFrame(state))) + ) + } + } catch { + case _: Exception => + } + emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT) isFinished.set(true) } catch { diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 1f39359155..da2f7ccc19 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -61,6 +61,9 @@ storage { runtime-statistics-namespace = "workflow-runtime-statistics" runtime-statistics-namespace = ${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE} + state-namespace = "operator-port-state" + state-namespace = ${?STORAGE_ICEBERG_TABLE_STATE_NAMESPACE} + commit { batch-size = 4096 # decide the buffer size of our IcebergTableWriter batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE} diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala index 9ec52bba65..123c56505e 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala @@ -67,6 +67,7 @@ object EnvironmentalVariable { "STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE" val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE = "STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE" + val ENV_ICEBERG_TABLE_STATE_NAMESPACE = "STORAGE_ICEBERG_TABLE_STATE_NAMESPACE" val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE = "STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE" val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = "STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES" val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS" diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index 728e3c0c2d..07447cfdbe 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -54,6 +54,8 @@ object StorageConfig { conf.getString("storage.iceberg.table.console-messages-namespace") val icebergTableRuntimeStatisticsNamespace: String = conf.getString("storage.iceberg.table.runtime-statistics-namespace") + val icebergTableStateNamespace: String = + conf.getString("storage.iceberg.table.state-namespace") val icebergTableCommitBatchSize: Int = conf.getInt("storage.iceberg.table.commit.batch-size") val icebergTableCommitNumRetries: Int = @@ -111,6 +113,7 @@ object StorageConfig { "STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE" val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE = "STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE" + val ENV_ICEBERG_TABLE_STATE_NAMESPACE = "STORAGE_ICEBERG_TABLE_STATE_NAMESPACE" val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE = "STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE" val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = "STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES" val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS" diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index ba146f1d57..532f355c17 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} import org.apache.texera.amber.util.JSONUtils.objectMapper +import java.net.URI import java.util.Base64 import scala.jdk.CollectionConverters.IteratorHasAsScala @@ -57,6 +58,9 @@ object State { def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content)) + def uriFromResultUri(resultUri: URI): URI = + new URI(resultUri.toString.replace("/result", "/state")) + private def toJsonValue(value: Any): Any = value match { case null => null diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 15949ef471..00f6c70ba7 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -72,6 +72,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => StorageConfig.icebergTableStateNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -119,6 +120,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => StorageConfig.icebergTableStateNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala index 0fbee64457..e687b28a29 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala @@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") + val STATE: Value = Value("state") } object VFSURIFactory { diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 8fdf039f3e..7f1d8573c2 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -20,6 +20,7 @@ package org.apache.texera.amber.storage.result.iceberg import org.apache.texera.amber.config.StorageConfig +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec} import org.apache.texera.amber.core.storage.{DocumentFactory, IcebergCatalogInstance, VFSURIFactory} import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} @@ -141,6 +142,84 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter } } + it should "round trip materialized state documents" in { + val stateUri = State.uriFromResultUri(uri) + DocumentFactory.createDocument(stateUri, State.schema) + val stateDocument = + DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]] + val state = State( + Map( + "loop_counter" -> 3, + "name" -> "outer-loop", + "payload" -> Array[Byte](0, 1, 2, 3), + "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3)) + ) + ) + + val writer = stateDocument.writer(UUID.randomUUID().toString) + writer.open() + writer.putOne(state.toTuple) + writer.close() + + val storedRows = stateDocument.get().toList + assert(storedRows.length == 1) + val deserialized = State.fromTuple(storedRows.head).values + assert(deserialized("loop_counter") == 3L) + assert(deserialized("name") == "outer-loop") + assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0, 1, 2, 3))) + assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") == true) + assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") == List(1L, 2L, 3L)) + } + + it should "materialize multiple states as rows in one state table" in { + val stateUri = State.uriFromResultUri(uri) + DocumentFactory.createDocument(stateUri, State.schema) + val stateDocument = + DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]] + val states: List[State] = List( + State(Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2, 3))), + State( + Map( + "loop_counter" -> 1, + "i" -> 2, + "payload" -> Array[Byte](4, 5, 6), + "nested" -> Map("values" -> List(3, 4)) + ) + ) + ) + + val writer = stateDocument.writer(UUID.randomUUID().toString) + writer.open() + states.foreach(state => writer.putOne(state.toTuple)) + writer.close() + + val deserializedStates = + stateDocument + .get() + .toList + .map(State.fromTuple) + .sortBy(_.values("loop_counter").asInstanceOf[Long]) + assert(deserializedStates.length == states.length) + deserializedStates.zip(states).foreach { + case (actual, expected) => + assert( + actual.values("loop_counter") == expected.values("loop_counter").asInstanceOf[Int].toLong + ) + assert(actual.values("i") == expected.values("i").asInstanceOf[Int].toLong) + assert( + actual + .values("payload") + .asInstanceOf[Array[Byte]] + .sameElements(expected.values("payload").asInstanceOf[Array[Byte]]) + ) + } + assert( + deserializedStates(1) + .values("nested") + .asInstanceOf[Map[String, Any]]("values") == List(3L, 4L) + ) + } + /** Returns a dynamic proxy for `realTable` that increments `counter` on every `refresh()` call. */ private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): Table = Proxy
