This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new ae68fc992a fix(revert): "fix: add missing context switches for
repeated state processing" (#4552)
ae68fc992a is described below
commit ae68fc992a77a1ec8003c205a4e49396756f3cec
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 28 18:11:06 2026 -0700
fix(revert): "fix: add missing context switches for repeated state
processing" (#4552)
Reverts apache/texera#4424 since it might change the lifecycle of the
Python engine.
Might be related to issue #4545
We don't know if #4424 really causes the issue, but reverting it for
testing purposes.
---
.../main/python/core/runnables/data_processor.py | 1 -
amber/src/main/python/core/runnables/main_loop.py | 1 -
.../main/python/core/runnables/test_main_loop.py | 61 ----------------------
3 files changed, 63 deletions(-)
diff --git a/amber/src/main/python/core/runnables/data_processor.py
b/amber/src/main/python/core/runnables/data_processor.py
index 815e85a644..4399b1a3a2 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -100,7 +100,6 @@ class DataProcessor(Runnable, Stoppable):
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
- self._switch_context()
self._set_output_state(executor.process_state(state, port_id))
except Exception as err:
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index 9356542a08..cde2847206 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -247,7 +247,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
def _process_state(self, state_: State) -> None:
self.context.state_processing_manager.current_input_state = state_
- self._switch_context()
self.process_input_state()
self._check_and_process_control()
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py
b/amber/src/main/python/core/runnables/test_main_loop.py
index e6136b0420..5612e4b41a 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -26,8 +26,6 @@ from threading import Thread
from core.models import (
DataFrame,
InternalQueue,
- State,
- StateFrame,
Tuple,
)
from core.models.internal_queue import (
@@ -1038,65 +1036,6 @@ class TestMainLoop:
reraise()
- @pytest.mark.timeout(2)
- def test_process_state_can_emit_multiple_states(
- self,
- main_loop,
- output_queue,
- mock_data_output_channel,
- monkeypatch,
- ):
- class DummyExecutor:
- @staticmethod
- def process_state(state: State, port: int) -> State:
- output_state = State()
- output_state["value"] = state["value"] + 1
- output_state["port"] = port
- return output_state
-
- main_loop.context.executor_manager.executor = DummyExecutor()
- monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
- monkeypatch.setattr(
- main_loop.context.output_manager,
- "emit_state",
- lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
- )
-
- switch_count = {"value": 0}
-
- def fake_switch_context():
- switch_count["value"] += 1
- if switch_count["value"] % 3 == 2:
- current_input_state = (
-
main_loop.context.state_processing_manager.current_input_state
- )
-
main_loop.context.state_processing_manager.current_output_state = (
- DummyExecutor.process_state(current_input_state, 0)
- )
-
- monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
-
- first_state = State()
- first_state["value"] = 1
- second_state = State()
- second_state["value"] = 41
-
- main_loop._process_state(first_state)
- main_loop._process_state(second_state)
-
- first_output: DataElement = output_queue.get()
- second_output: DataElement = output_queue.get()
-
- assert first_output.tag == mock_data_output_channel
- assert isinstance(first_output.payload, StateFrame)
- assert first_output.payload.frame["value"] == 2
- assert first_output.payload.frame["port"] == 0
-
- assert second_output.tag == mock_data_output_channel
- assert isinstance(second_output.payload, StateFrame)
- assert second_output.payload.frame["value"] == 42
- assert second_output.payload.frame["port"] == 0
-
@staticmethod
def send_pause(
command_sequence,