This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 62d285443c update
62d285443c is described below
commit 62d285443ccde08478fe426a7636f1c356d38165
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Apr 29 16:19:23 2026 -0700
update
---
.../main/python/core/runnables/data_processor.py | 26 +--
amber/src/main/python/core/runnables/main_loop.py | 8 +-
.../main/python/core/runnables/test_main_loop.py | 175 +++++++++++++++++++--
3 files changed, 177 insertions(+), 32 deletions(-)
diff --git a/amber/src/main/python/core/runnables/data_processor.py
b/amber/src/main/python/core/runnables/data_processor.py
index 815e85a644..776aa35b87 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -49,16 +49,14 @@ class DataProcessor(Runnable, Stoppable):
with self._context.tuple_processing_manager.context_switch_condition:
self._context.tuple_processing_manager.context_switch_condition.wait()
self._running.set()
- self._switch_context()
while self._running.is_set():
- marker =
self._context.tuple_processing_manager.get_internal_marker()
- state = self._context.state_processing_manager.get_input_state()
- tuple_ = self._context.tuple_processing_manager.current_input_tuple
- if marker is not None:
- self.process_internal_marker(marker)
- elif state is not None:
- self.process_state(state)
- elif tuple_ is not None:
+ tpm = self._context.tuple_processing_manager
+ spm = self._context.state_processing_manager
+ if tpm.current_internal_marker is not None:
+ self.process_internal_marker(tpm.get_internal_marker())
+ elif spm.current_input_state is not None:
+ self.process_state(spm.get_input_state())
+ elif tpm.current_input_tuple is not None:
self.process_tuple()
else:
raise RuntimeError("No marker or tuple to process.")
@@ -85,9 +83,6 @@ class DataProcessor(Runnable, Stoppable):
self._context.exception_manager.set_exception_info(exc_info)
self._report_exception(exc_info)
- finally:
- self._switch_context()
-
def process_state(self, state: State) -> None:
"""
Process an input marker by invoking appropriate state
@@ -100,7 +95,6 @@ class DataProcessor(Runnable, Stoppable):
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
- self._switch_context()
self._set_output_state(executor.process_state(state, port_id))
except Exception as err:
@@ -109,9 +103,6 @@ class DataProcessor(Runnable, Stoppable):
self._context.exception_manager.set_exception_info(exc_info)
self._report_exception(exc_info)
- finally:
- self._switch_context()
-
def process_tuple(self) -> None:
"""
Process an input tuple by invoking the executor's tuple processing
method.
@@ -134,9 +125,6 @@ class DataProcessor(Runnable, Stoppable):
self._context.exception_manager.set_exception_info(exc_info)
self._report_exception(exc_info)
- finally:
- self._switch_context()
-
def _set_output_tuple(self, output_iterator:
Iterator[Optional[TupleLike]]) -> None:
"""
Set the output tuple after processing by the executor.
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index d42aedc7e4..8454808b05 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -225,9 +225,13 @@ class MainLoop(StoppableQueueBlockingRunnable):
)
def process_input_state(self) -> None:
+ # Single switch handshake: DataProc parks at the run-loop's
+ # end-of-body switch (line 65) between tasks, so one switch from
+ # MainLoop drives a full pick-up -> executor -> output -> park-back
+ # cycle. By the time the switch returns, current_output_state holds
+ # the freshly produced output.
self._switch_context()
output_state = self.context.state_processing_manager.get_output_state()
- self._switch_context()
if output_state is not None:
if isinstance(self.context.executor_manager.executor,
LoopEndOperator):
self.context.output_manager.reset_output_storage()
@@ -287,7 +291,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
def _process_state(self, state_: State) -> None:
self.context.state_processing_manager.current_input_state = state_
- self._switch_context()
self.process_input_state()
self._check_and_process_control()
@@ -376,7 +379,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
self.context.pause_manager.resume(PauseType.ECM_PAUSE)
- self._switch_context()
if self.context.tuple_processing_manager.current_internal_marker:
{
StartChannel: self._process_start_channel,
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py
b/amber/src/main/python/core/runnables/test_main_loop.py
index 62065e1b8c..d99049a9a3 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -361,6 +361,16 @@ class TestMainLoop:
)
return DCMElement(tag=mock_control_input_channel, payload=payload)
+ @pytest.fixture
+ def mock_state_data_elements(self, mock_data_input_channel):
+ return [
+ DataElement(
+ tag=mock_data_input_channel,
+ payload=StateFrame(frame={"value": value}),
+ )
+ for value in (1, 2, 3, 4)
+ ]
+
@pytest.fixture
def mock_initialize_batch_count_executor(
self,
@@ -1101,17 +1111,15 @@ class TestMainLoop:
lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
)
- switch_count = {"value": 0}
-
def fake_switch_context():
- switch_count["value"] += 1
- # xinyuan-state-only still uses the original two-switch state
handshake:
- # the DataProcessor produces output during the first switch of each
- # process_input_state() call, before MainLoop reads
current_output_state.
- if switch_count["value"] % 2 == 1:
- current_input_state = (
-
main_loop.context.state_processing_manager.current_input_state
- )
+ # process_input_state now uses a single switch per call, mirroring
+ # the per-iteration switch in process_tuple_with_udf. Each switch
+ # simulates DataProc consuming the queued input state and writing
+ # current_output_state.
+ current_input_state = (
+ main_loop.context.state_processing_manager.current_input_state
+ )
+ if current_input_state is not None:
main_loop.context.state_processing_manager.current_output_state = (
DummyExecutor.process_state(current_input_state, 0)
)
@@ -1290,3 +1298,150 @@ class TestMainLoop:
"test-1"
] == b"pickle " + pickle.dumps(mock_binary_tuple["test-1"])
reraise()
+
+ @pytest.mark.timeout(5)
+ def test_main_loop_thread_can_process_state(
+ self,
+ mock_data_output_channel,
+ mock_control_output_channel,
+ input_queue,
+ output_queue,
+ main_loop,
+ main_loop_thread,
+ mock_assign_input_port,
+ mock_assign_output_port,
+ mock_add_input_channel,
+ mock_add_partitioning,
+ mock_initialize_executor,
+ mock_data_element,
+ mock_state_data_elements,
+ mock_end_of_upstream,
+ command_sequence,
+ reraise,
+ ):
+ # End-to-end coverage of the state-processing path through the real
+ # MainLoop + DataProcessor threads.
+ #
+ # The cooperative-threading handshake works like this:
+ # - DataProcessor.run() peeks current_internal_marker /
+ # current_input_state / current_input_tuple every iteration and
+ # consumes only the slot whose branch it takes -- unhandled
+ # inputs survive into the next iteration.
+ # - process_state runs the executor inside replace_print() and
+ # writes the result to current_output_state, then notifies
+ # MainLoop via the finally _switch_context().
+ # - MainLoop.process_input_state() switches twice and reads
+ # current_output_state after both switches, so the read sees
+ # the value DataProc has just written.
+ #
+ # The expected behavior is therefore: each state produces its own
+ # output in its own cycle (no lag), and an EndChannel ECM after
+ # the last state produces an additional output via
+ # produce_state_on_finish.
+ main_loop_thread.start()
+
+ for setup_msg in [
+ mock_assign_input_port,
+ mock_assign_output_port,
+ mock_add_input_channel,
+ mock_add_partitioning,
+ mock_initialize_executor,
+ ]:
+ input_queue.put(setup_msg)
+ assert output_queue.get() == DCMElement(
+ tag=mock_control_output_channel,
+ payload=DirectControlMessagePayloadV2(
+ return_invocation=ReturnInvocation(
+ command_id=command_sequence,
+ return_value=ControlReturn(empty_return=EmptyReturn()),
+ )
+ ),
+ )
+
+ # Replace the EchoOperator that mock_initialize_executor loaded with
+ # an in-process executor that tags processed states and emits a
+ # finish marker on EndChannel. Going through the InitializeExecutor
+ # RPC above sets up the rest of the worker state (output schema,
+ # partitioning bookkeeping); swapping the executor instance here
+ # lets the test observe whether process_state actually runs without
+ # depending on Python's cross-test module caching for the loaded
+ # operator class.
+ class StateProcessingExecutor:
+ @staticmethod
+ def process_tuple(tuple_, port):
+ yield tuple_
+
+ @staticmethod
+ def process_state(state, port):
+ return {**state, "processed_marker": "executed", "port": port}
+
+ @staticmethod
+ def produce_state_on_finish(port):
+ return {"finish_marker": "produce_state_on_finish_ran"}
+
+ @staticmethod
+ def on_finish(port):
+ yield
+
+ @staticmethod
+ def close():
+ pass
+
+ main_loop.context.executor_manager.executor = StateProcessingExecutor()
+
+ # Send four states directly -- no warm-up tuple needed. With the
+ # init switch in DataProc.run() removed, MainLoop's first switch
+ # lands DataProc directly in the while-loop where it processes
+ # the queued state, so even the first state cycle works.
+ for state_element in mock_state_data_elements:
+ input_queue.put(state_element)
+
+ for expected_value in (1, 2, 3, 4):
+ output_data_element: DataElement = output_queue.get()
+ assert output_data_element.tag == mock_data_output_channel
+ assert isinstance(output_data_element.payload, StateFrame), (
+ f"expected StateFrame for value={expected_value}, got "
+ f"{type(output_data_element.payload).__name__}"
+ )
+ output_state = output_data_element.payload.frame
+ assert output_state["value"] == expected_value, (
+ f"state outputs arrived out of order: expected value="
+ f"{expected_value}, got value={output_state['value']}"
+ )
+ assert output_state["processed_marker"] == "executed"
+ assert output_state["port"] == 0
+
+ # Send EndChannel to drive _process_end_channel. The executor's
+ # produce_state_on_finish writes a finish-marker state into
+ # current_output_state inside DataProc's process_internal_marker;
+ # MainLoop's process_input_state then emits it.
+ input_queue.put(mock_end_of_upstream)
+
+ # Drain the control reply messages so the next data
+ # output_queue.get() returns the post-EndChannel data emission.
+ output_queue.disable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+ for _ in range(3):
+ control_reply = output_queue.get()
+ assert isinstance(control_reply, DCMElement), (
+ f"expected DCMElement during EndChannel teardown, got "
+ f"{type(control_reply).__name__}"
+ )
+ output_queue.enable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+
+ end_channel_state_output: DataElement = output_queue.get()
+ assert end_channel_state_output.tag == mock_data_output_channel
+ assert isinstance(end_channel_state_output.payload, StateFrame), (
+ f"expected StateFrame for the EndChannel-driven emission, got "
+ f"{type(end_channel_state_output.payload).__name__}"
+ )
+ end_channel_state = end_channel_state_output.payload.frame
+ assert "finish_marker" in end_channel_state, (
+ f"EndChannel emission should be the finish-marker state from "
+ f"produce_state_on_finish, got {end_channel_state!r}"
+ )
+ assert (
+ end_channel_state["finish_marker"]
+ == "produce_state_on_finish_ran"
+ )
+
+ reraise()