This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new b8859b44ea refactor(amber): centralize worker-id parsing in
virtual_identity (#5706)
b8859b44ea is described below
commit b8859b44ea1a410daec3b3c0d4b7a71614f33c22
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 22:57:26 2026 -0700
refactor(amber): centralize worker-id parsing in virtual_identity (#5706)
### What changes were proposed in this PR?
Centralizes the Python worker's worker-id parsing in
`core/util/virtual_identity.py`:
- Adds `get_operator_id(worker_id)` — extracts the logical operator id
from a worker actor name (`Worker:WF<wf>-<op>-<layer>-<idx>`), raising
`ValueError` on a malformed id.
- Generalizes `worker_name_pattern` to capture the workflow id and
operator id explicitly.
- Switches both `get_worker_index` and `get_operator_id` to
`re.fullmatch`, so a malformed id with trailing junk now fails loudly
instead of parsing silently — matching the Scala
`VirtualIdentityUtils.getPhysicalOpId` full-match semantics the
docstring already claims.
| case | before | after |
|---|---|---|
| `get_worker_index`, well-formed id | worker index | same value |
| `get_worker_index`, malformed id (trailing junk) | parsed silently |
raises `ValueError` |
| `get_operator_id` | — | new helper |
Behavior-preserving for well-formed worker ids. `get_operator_id`'s
production caller lands with the for-loop feature; the helper and its
test are independent and mergeable now.
### Any related issues, documentation, discussions?
Resolves #5708 (sub-issue of #4442 "Introduce for loop"). Split out of
#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715).
### How was this PR tested?
`pytest src/test/python/core/util/test_virtual_identity.py` — 23
passing, covering well-formed ids, the new `get_operator_id`, and
malformed ids that now raise `ValueError`. `ruff check`/`format` clean.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.
---
.../src/main/python/core/util/virtual_identity.py | 29 ++++++++++++--
.../org/apache/texera/amber/error/ErrorUtils.scala | 2 +-
.../test/python/core/util/test_virtual_identity.py | 44 ++++++++++++++++++++++
.../texera/amber/util/VirtualIdentityUtils.scala | 20 ++++++++++
.../amber/util/VirtualIdentityUtilsSpec.scala | 28 ++++++++++++++
5 files changed, 118 insertions(+), 5 deletions(-)
diff --git a/amber/src/main/python/core/util/virtual_identity.py
b/amber/src/main/python/core/util/virtual_identity.py
index 49da75fcd5..6893e7e8f0 100644
--- a/amber/src/main/python/core/util/virtual_identity.py
+++ b/amber/src/main/python/core/util/virtual_identity.py
@@ -24,16 +24,37 @@ from proto.org.apache.texera.amber.core import (
ActorVirtualIdentity,
)
-worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)")
+worker_name_pattern = re.compile(r"Worker:WF(\d+)-(.+)-(\w+)-(\d+)")
MATERIALIZATION_READER_ACTOR_PREFIX = "MATERIALIZATION_READER_"
def get_worker_index(worker_id: str) -> int:
- match = worker_name_pattern.match(worker_id)
+ match = worker_name_pattern.fullmatch(worker_id)
if match:
- return int(match.group(2))
- raise ValueError("Invalid worker ID format")
+ return int(match.group(4))
+ raise ValueError(f"Invalid worker ID format: {worker_id}")
+
+
+def get_logical_op_id(worker_id: str) -> str:
+ """
+ Extract the logical operator id from a worker actor name of the form
+ ``Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>``.
+
+ Returns the logical operator id only (the ``<operatorId>`` segment); the
+ physical operator id additionally carries the ``<layerName>``. Name
+ parallels Scala ``VirtualIdentityUtils.getLogicalOpId`` so the logical /
+ physical distinction is visible at every call site (the matching Scala
+ physical-id accessor is ``getPhysicalOpId``).
+
+ Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel
+ on a non-match), this raises ``ValueError`` so a malformed worker id
+ fails loudly rather than yielding a wrong id silently.
+ """
+ match = worker_name_pattern.fullmatch(worker_id)
+ if match:
+ return match.group(2)
+ raise ValueError(f"Invalid worker ID format: {worker_id}")
def serialize_global_port_identity(obj: GlobalPortIdentity) -> str:
diff --git
a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala
b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala
index af16044af2..d7c021524c 100644
--- a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala
+++ b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala
@@ -110,7 +110,7 @@ object ErrorUtils {
var operatorId = "unknown operator"
var workerId = ""
if (actorIdOpt.isDefined) {
- operatorId =
VirtualIdentityUtils.getPhysicalOpId(actorIdOpt.get).logicalOpId.id
+ operatorId = VirtualIdentityUtils.getLogicalOpId(actorIdOpt.get)
workerId = actorIdOpt.get.name
}
(operatorId, workerId)
diff --git a/amber/src/test/python/core/util/test_virtual_identity.py
b/amber/src/test/python/core/util/test_virtual_identity.py
index c2f6f63685..b94213c414 100644
--- a/amber/src/test/python/core/util/test_virtual_identity.py
+++ b/amber/src/test/python/core/util/test_virtual_identity.py
@@ -20,6 +20,7 @@ import pytest
from core.util.virtual_identity import (
deserialize_global_port_identity,
get_from_actor_id_for_input_port_storage,
+ get_logical_op_id,
get_worker_index,
serialize_global_port_identity,
)
@@ -75,6 +76,49 @@ class TestGetWorkerIndex:
# greedy `.+` and breaks the trailing match surfaces here.
assert get_worker_index("Worker:WF1-myOp-1st-physical-op-3") == 3
+ def test_raises_value_error_on_trailing_junk(self):
+ # fullmatch (not match) anchors the end of the string: a well-formed
+ # prefix followed by trailing junk must fail loudly. The old
+ # start-anchored match() would have silently returned 7 here.
+ with pytest.raises(ValueError, match="Invalid worker ID format"):
+ get_worker_index("Worker:WF1-myOp-main-7extra")
+
+
+class TestGetLogicalOpId:
+ def test_extracts_operator_id_from_canonical_name(self):
+ assert get_logical_op_id("Worker:WF1-myOp-main-0") == "myOp"
+
+ def test_isolates_operator_id_containing_hyphens(self):
+ # Load-bearing: operator ids contain dashes; greedy `.+` must still
+ # stop at the final <layer>-<index> tokens.
+ assert (
+ get_logical_op_id("Worker:WF12-PythonUDFV2-abc-def-main-0")
+ == "PythonUDFV2-abc-def"
+ )
+
+ def test_handles_non_main_layer_and_nonzero_index(self):
+ # The exact case the old `rsplit("-main-0")` got silently wrong.
+ assert get_logical_op_id("Worker:WF3-op-loopLayer-7") == "op"
+
+ def test_operator_id_ending_in_digits(self):
+ assert get_logical_op_id("Worker:WF1-op123-main-0") == "op123"
+
+ def test_raises_value_error_on_special_actor_id(self):
+ # Companions like CONTROLLER / SELF must fail loudly, not return junk.
+ with pytest.raises(ValueError, match="Invalid worker ID format"):
+ get_logical_op_id("CONTROLLER")
+
+ def test_raises_value_error_on_partial_match(self):
+ with pytest.raises(ValueError, match="Invalid worker ID format"):
+ get_logical_op_id("Worker:WF1-myOp-main")
+
+ def test_raises_value_error_on_trailing_junk(self):
+ # fullmatch anchors the end: a valid-looking prefix with trailing junk
+ # must fail loudly. The old start-anchored match() would have silently
+ # returned "myOp" here.
+ with pytest.raises(ValueError, match="Invalid worker ID format"):
+ get_logical_op_id("Worker:WF1-myOp-main-0extra")
+
class TestSerializeGlobalPortIdentity:
def test_emits_documented_format_for_canonical_input(self):
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
index 586d77a9c5..f91cca6af3 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
@@ -71,6 +71,26 @@ object VirtualIdentityUtils {
}
}
+ /**
+ * Extract the logical operator id from a worker actor id of the form
+ * `Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>`.
+ *
+ * Returns the logical operator id only (the `<operatorId>` segment);
+ * the physical operator id additionally carries the `<layerName>` and
+ * is exposed by [[getPhysicalOpId]]. Method name parallels
+ * `getPhysicalOpId` so callers can distinguish the two at the call
+ * site; the Python sibling is
`core.util.virtual_identity.get_logical_op_id`.
+ *
+ * The Python helper raises `ValueError` on a non-match for fail-loud
+ * semantics; this Scala helper preserves the existing sentinel-on-miss
+ * behavior (`"__DummyOperator"`) so it stays a drop-in replacement for
+ * the inline `getPhysicalOpId(workerId).logicalOpId.id` pattern at
+ * call sites.
+ */
+ def getLogicalOpId(workerId: ActorVirtualIdentity): String = {
+ getPhysicalOpId(workerId).logicalOpId.id
+ }
+
def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = {
workerId.name match {
case workerNamePattern(_, _, _, idx) =>
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
index 8ebf7dabec..58e58f68e4 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
@@ -94,6 +94,34 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with
Matchers {
}
}
+ // ----- getLogicalOpId -----
+
+ "getLogicalOpId" should "return the logical operator id from a worker actor
name" in {
+ val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-3")
+ VirtualIdentityUtils.getLogicalOpId(actor) shouldBe "myOp"
+ }
+
+ it should "match getPhysicalOpId(...).logicalOpId.id for worker actor names"
in {
+ // Pin the helper as a thin wrapper — `getLogicalOpId(workerId)` and
+ // `getPhysicalOpId(workerId).logicalOpId.id` must always agree, so
+ // call sites that migrate to the helper are guaranteed to keep
+ // identical behavior.
+ val actor = ActorVirtualIdentity("Worker:WF1-multi-part-op-main-0")
+ VirtualIdentityUtils.getLogicalOpId(actor) shouldBe
+ VirtualIdentityUtils.getPhysicalOpId(actor).logicalOpId.id
+ }
+
+ it should "fall back to the __DummyOperator sentinel for non-worker actor
names" in {
+ // The Python sibling raises ValueError on a non-match; the Scala
+ // helper preserves the existing __DummyOperator sentinel so it
+ // stays a drop-in replacement for the inline pattern at call sites
+ // (see VirtualIdentityUtils.getLogicalOpId docstring).
+ val controller = ActorVirtualIdentity("CONTROLLER")
+ VirtualIdentityUtils.getLogicalOpId(controller) shouldBe "__DummyOperator"
+ val self = ActorVirtualIdentity("SELF")
+ VirtualIdentityUtils.getLogicalOpId(self) shouldBe "__DummyOperator"
+ }
+
// ----- getWorkerIndex -----
"getWorkerIndex" should "return the trailing numeric workerId from a worker
actor name" in {