This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5706-01e070fd83dd7fd9a5574964880fe67d7f8f1257 in repository https://gitbox.apache.org/repos/asf/texera.git
commit b8859b44ea1a410daec3b3c0d4b7a71614f33c22 Author: Xinyuan Lin <[email protected]> AuthorDate: Sat Jun 13 22:57:26 2026 -0700 refactor(amber): centralize worker-id parsing in virtual_identity (#5706) ### What changes were proposed in this PR? Centralizes the Python worker's worker-id parsing in `core/util/virtual_identity.py`: - Adds `get_operator_id(worker_id)` — extracts the logical operator id from a worker actor name (`Worker:WF<wf>-<op>-<layer>-<idx>`), raising `ValueError` on a malformed id. - Generalizes `worker_name_pattern` to capture the workflow id and operator id explicitly. - Switches both `get_worker_index` and `get_operator_id` to `re.fullmatch`, so a malformed id with trailing junk now fails loudly instead of parsing silently — matching the Scala `VirtualIdentityUtils.getPhysicalOpId` full-match semantics the docstring already claims. | case | before | after | |---|---|---| | `get_worker_index`, well-formed id | worker index | same value | | `get_worker_index`, malformed id (trailing junk) | parsed silently | raises `ValueError` | | `get_operator_id` | — | new helper | Behavior-preserving for well-formed worker ids. `get_operator_id`'s production caller lands with the for-loop feature; the helper and its test are independent and mergeable now. ### Any related issues, documentation, discussions? Resolves #5708 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715). ### How was this PR tested? `pytest src/test/python/core/util/test_virtual_identity.py` — 23 passing, covering well-formed ids, the new `get_operator_id`, and malformed ids that now raise `ValueError`. `ruff check`/`format` clean. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --- .../src/main/python/core/util/virtual_identity.py | 29 ++++++++++++-- .../org/apache/texera/amber/error/ErrorUtils.scala | 2 +- .../test/python/core/util/test_virtual_identity.py | 44 ++++++++++++++++++++++ .../texera/amber/util/VirtualIdentityUtils.scala | 20 ++++++++++ .../amber/util/VirtualIdentityUtilsSpec.scala | 28 ++++++++++++++ 5 files changed, 118 insertions(+), 5 deletions(-) diff --git a/amber/src/main/python/core/util/virtual_identity.py b/amber/src/main/python/core/util/virtual_identity.py index 49da75fcd5..6893e7e8f0 100644 --- a/amber/src/main/python/core/util/virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity.py @@ -24,16 +24,37 @@ from proto.org.apache.texera.amber.core import ( ActorVirtualIdentity, ) -worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)") +worker_name_pattern = re.compile(r"Worker:WF(\d+)-(.+)-(\w+)-(\d+)") MATERIALIZATION_READER_ACTOR_PREFIX = "MATERIALIZATION_READER_" def get_worker_index(worker_id: str) -> int: - match = worker_name_pattern.match(worker_id) + match = worker_name_pattern.fullmatch(worker_id) if match: - return int(match.group(2)) - raise ValueError("Invalid worker ID format") + return int(match.group(4)) + raise ValueError(f"Invalid worker ID format: {worker_id}") + + +def get_logical_op_id(worker_id: str) -> str: + """ + Extract the logical operator id from a worker actor name of the form + ``Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>``. + + Returns the logical operator id only (the ``<operatorId>`` segment); the + physical operator id additionally carries the ``<layerName>``. Name + parallels Scala ``VirtualIdentityUtils.getLogicalOpId`` so the logical / + physical distinction is visible at every call site (the matching Scala + physical-id accessor is ``getPhysicalOpId``). + + Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel + on a non-match), this raises ``ValueError`` so a malformed worker id + fails loudly rather than yielding a wrong id silently. + """ + match = worker_name_pattern.fullmatch(worker_id) + if match: + return match.group(2) + raise ValueError(f"Invalid worker ID format: {worker_id}") def serialize_global_port_identity(obj: GlobalPortIdentity) -> str: diff --git a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala index af16044af2..d7c021524c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala @@ -110,7 +110,7 @@ object ErrorUtils { var operatorId = "unknown operator" var workerId = "" if (actorIdOpt.isDefined) { - operatorId = VirtualIdentityUtils.getPhysicalOpId(actorIdOpt.get).logicalOpId.id + operatorId = VirtualIdentityUtils.getLogicalOpId(actorIdOpt.get) workerId = actorIdOpt.get.name } (operatorId, workerId) diff --git a/amber/src/test/python/core/util/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py index c2f6f63685..b94213c414 100644 --- a/amber/src/test/python/core/util/test_virtual_identity.py +++ b/amber/src/test/python/core/util/test_virtual_identity.py @@ -20,6 +20,7 @@ import pytest from core.util.virtual_identity import ( deserialize_global_port_identity, get_from_actor_id_for_input_port_storage, + get_logical_op_id, get_worker_index, serialize_global_port_identity, ) @@ -75,6 +76,49 @@ class TestGetWorkerIndex: # greedy `.+` and breaks the trailing match surfaces here. assert get_worker_index("Worker:WF1-myOp-1st-physical-op-3") == 3 + def test_raises_value_error_on_trailing_junk(self): + # fullmatch (not match) anchors the end of the string: a well-formed + # prefix followed by trailing junk must fail loudly. The old + # start-anchored match() would have silently returned 7 here. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_worker_index("Worker:WF1-myOp-main-7extra") + + +class TestGetLogicalOpId: + def test_extracts_operator_id_from_canonical_name(self): + assert get_logical_op_id("Worker:WF1-myOp-main-0") == "myOp" + + def test_isolates_operator_id_containing_hyphens(self): + # Load-bearing: operator ids contain dashes; greedy `.+` must still + # stop at the final <layer>-<index> tokens. + assert ( + get_logical_op_id("Worker:WF12-PythonUDFV2-abc-def-main-0") + == "PythonUDFV2-abc-def" + ) + + def test_handles_non_main_layer_and_nonzero_index(self): + # The exact case the old `rsplit("-main-0")` got silently wrong. + assert get_logical_op_id("Worker:WF3-op-loopLayer-7") == "op" + + def test_operator_id_ending_in_digits(self): + assert get_logical_op_id("Worker:WF1-op123-main-0") == "op123" + + def test_raises_value_error_on_special_actor_id(self): + # Companions like CONTROLLER / SELF must fail loudly, not return junk. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_logical_op_id("CONTROLLER") + + def test_raises_value_error_on_partial_match(self): + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_logical_op_id("Worker:WF1-myOp-main") + + def test_raises_value_error_on_trailing_junk(self): + # fullmatch anchors the end: a valid-looking prefix with trailing junk + # must fail loudly. The old start-anchored match() would have silently + # returned "myOp" here. + with pytest.raises(ValueError, match="Invalid worker ID format"): + get_logical_op_id("Worker:WF1-myOp-main-0extra") + class TestSerializeGlobalPortIdentity: def test_emits_documented_format_for_canonical_input(self): diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 586d77a9c5..f91cca6af3 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -71,6 +71,26 @@ object VirtualIdentityUtils { } } + /** + * Extract the logical operator id from a worker actor id of the form + * `Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>`. + * + * Returns the logical operator id only (the `<operatorId>` segment); + * the physical operator id additionally carries the `<layerName>` and + * is exposed by [[getPhysicalOpId]]. Method name parallels + * `getPhysicalOpId` so callers can distinguish the two at the call + * site; the Python sibling is `core.util.virtual_identity.get_logical_op_id`. + * + * The Python helper raises `ValueError` on a non-match for fail-loud + * semantics; this Scala helper preserves the existing sentinel-on-miss + * behavior (`"__DummyOperator"`) so it stays a drop-in replacement for + * the inline `getPhysicalOpId(workerId).logicalOpId.id` pattern at + * call sites. + */ + def getLogicalOpId(workerId: ActorVirtualIdentity): String = { + getPhysicalOpId(workerId).logicalOpId.id + } + def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = { workerId.name match { case workerNamePattern(_, _, _, idx) => diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index 8ebf7dabec..58e58f68e4 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -94,6 +94,34 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { } } + // ----- getLogicalOpId ----- + + "getLogicalOpId" should "return the logical operator id from a worker actor name" in { + val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-3") + VirtualIdentityUtils.getLogicalOpId(actor) shouldBe "myOp" + } + + it should "match getPhysicalOpId(...).logicalOpId.id for worker actor names" in { + // Pin the helper as a thin wrapper — `getLogicalOpId(workerId)` and + // `getPhysicalOpId(workerId).logicalOpId.id` must always agree, so + // call sites that migrate to the helper are guaranteed to keep + // identical behavior. + val actor = ActorVirtualIdentity("Worker:WF1-multi-part-op-main-0") + VirtualIdentityUtils.getLogicalOpId(actor) shouldBe + VirtualIdentityUtils.getPhysicalOpId(actor).logicalOpId.id + } + + it should "fall back to the __DummyOperator sentinel for non-worker actor names" in { + // The Python sibling raises ValueError on a non-match; the Scala + // helper preserves the existing __DummyOperator sentinel so it + // stays a drop-in replacement for the inline pattern at call sites + // (see VirtualIdentityUtils.getLogicalOpId docstring). + val controller = ActorVirtualIdentity("CONTROLLER") + VirtualIdentityUtils.getLogicalOpId(controller) shouldBe "__DummyOperator" + val self = ActorVirtualIdentity("SELF") + VirtualIdentityUtils.getLogicalOpId(self) shouldBe "__DummyOperator" + } + // ----- getWorkerIndex ----- "getWorkerIndex" should "return the trailing numeric workerId from a worker actor name" in {
