This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch loop-feb in repository https://gitbox.apache.org/repos/asf/texera.git
commit aa9fe2d9f084313926bd6bcc766145035381e4ce Author: Xinyuan Lin <[email protected]> AuthorDate: Sat Jun 13 17:28:48 2026 -0700 fix(loop): address Copilot review on #5700 - virtual_identity get_worker_index/get_operator_id: use re.fullmatch so a malformed worker id (trailing junk) fails loudly, matching the Scala VirtualIdentityUtils full-match semantics and the docstring's stated intent. - RegionExecutionCoordinator.createOutputPortStorageObjects: decide reusesOutputStorage per the operator that owns each output port rather than region-wide, so a region mixing a reuse op (LoopEnd) with others still recreates the others' documents on re-execution. (No behavior change today since loop ops are single-op materialized regions; correct-by-construction.) - output_manager.close_port_storage_writers: clear both the result and state writer registries after stopping, not just the state registry. - LoopEndOpDesc.generatePythonCode: normalize generated method-body indentation to 8 spaces (was 6), matching LoopStart and PEP8. The 6-space form was valid Python; this is cosmetic consistency only. --- amber/src/main/python/core/architecture/packaging/output_manager.py | 4 +++- amber/src/main/python/core/util/virtual_identity.py | 4 ++-- .../engine/architecture/scheduling/RegionExecutionCoordinator.scala | 6 +++++- .../scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala | 6 +++--- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index a8581e85df..24075f474f 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -279,7 +279,9 @@ class OutputManager: writer.stop() for _, _, thread in registry.values(): thread.join() - self._port_state_writers.clear() + # Drop the stopped writers so a later reset/close doesn't act on + # stale entries (set_up_port_storage_writer repopulates on reset). + registry.clear() def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None: """ diff --git a/amber/src/main/python/core/util/virtual_identity.py b/amber/src/main/python/core/util/virtual_identity.py index 7d332f4bb6..143aa86d0e 100644 --- a/amber/src/main/python/core/util/virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity.py @@ -30,7 +30,7 @@ MATERIALIZATION_READER_ACTOR_PREFIX = "MATERIALIZATION_READER_" def get_worker_index(worker_id: str) -> int: - match = worker_name_pattern.match(worker_id) + match = worker_name_pattern.fullmatch(worker_id) if match: return int(match.group(4)) raise ValueError("Invalid worker ID format") @@ -46,7 +46,7 @@ def get_operator_id(worker_id: str) -> str: non-match), this raises ``ValueError`` so a malformed worker id fails loudly rather than yielding a wrong id silently. """ - match = worker_name_pattern.match(worker_id) + match = worker_name_pattern.fullmatch(worker_id) if match: return match.group(2) raise ValueError(f"Invalid worker ID format: {worker_id}") diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 9d43f4e18b..1f3e563bca 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -618,7 +618,11 @@ class RegionExecutionCoordinator( // output once per outer iteration -- on the Python worker side in // MainLoop._process_state_frame -- which is orthogonal to this // region-provisioning reuse.) - val reusesOutputStorage = region.getOperators.exists(_.reusesOutputStorageOnReExecution) + // Decided per the operator that OWNS this port, not region-wide: a + // region mixing a reuse op (LoopEnd) with others must still recreate + // the others' documents on re-execution. + val reusesOutputStorage = + region.getOperator(outputPortId.opId).reusesOutputStorageOnReExecution Seq((resultURI, schema), (stateURI, State.schema)).foreach { case (uri, sch) => RegionExecutionCoordinator.provisionOutputDocument( diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala index ff7486385c..b2d2a49de9 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -51,12 +51,12 @@ class LoopEndOpDesc extends LoopOpDesc { |class ProcessLoopEndOperator(LoopEndOperator): | @overrides | def process_state(self, state: State, port: int) -> Optional[State]: - | self.run_update($update, state) - | return None + | self.run_update($update, state) + | return None | | @overrides | def condition(self) -> bool: - | return self.eval_condition($condition) + | return self.eval_condition($condition) |""".encode } }
