This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5706-01e070fd83dd7fd9a5574964880fe67d7f8f1257
in repository https://gitbox.apache.org/repos/asf/texera.git

commit b8859b44ea1a410daec3b3c0d4b7a71614f33c22
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 22:57:26 2026 -0700

    refactor(amber): centralize worker-id parsing in virtual_identity (#5706)
    
    ### What changes were proposed in this PR?
    
    Centralizes the Python worker's worker-id parsing in
    `core/util/virtual_identity.py`:
    
    - Adds `get_operator_id(worker_id)` — extracts the logical operator id
    from a worker actor name (`Worker:WF<wf>-<op>-<layer>-<idx>`), raising
    `ValueError` on a malformed id.
    - Generalizes `worker_name_pattern` to capture the workflow id and
    operator id explicitly.
    - Switches both `get_worker_index` and `get_operator_id` to
    `re.fullmatch`, so a malformed id with trailing junk now fails loudly
    instead of parsing silently — matching the Scala
    `VirtualIdentityUtils.getPhysicalOpId` full-match semantics the
    docstring already claims.
    
    | case | before | after |
    |---|---|---|
    | `get_worker_index`, well-formed id | worker index | same value |
    | `get_worker_index`, malformed id (trailing junk) | parsed silently |
    raises `ValueError` |
    | `get_operator_id` | — | new helper |
    
    Behavior-preserving for well-formed worker ids. `get_operator_id`'s
    production caller lands with the for-loop feature; the helper and its
    test are independent and mergeable now.
    
    ### Any related issues, documentation, discussions?
    
    Resolves #5708 (sub-issue of #4442 "Introduce for loop"). Split out of
    #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
    
[review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715).
    
    ### How was this PR tested?
    
    `pytest src/test/python/core/util/test_virtual_identity.py` — 23
    passing, covering well-formed ids, the new `get_operator_id`, and
    malformed ids that now raise `ValueError`. `ruff check`/`format` clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Opus 4.8 in compliance with ASF.
---
 .../src/main/python/core/util/virtual_identity.py  | 29 ++++++++++++--
 .../org/apache/texera/amber/error/ErrorUtils.scala |  2 +-
 .../test/python/core/util/test_virtual_identity.py | 44 ++++++++++++++++++++++
 .../texera/amber/util/VirtualIdentityUtils.scala   | 20 ++++++++++
 .../amber/util/VirtualIdentityUtilsSpec.scala      | 28 ++++++++++++++
 5 files changed, 118 insertions(+), 5 deletions(-)

diff --git a/amber/src/main/python/core/util/virtual_identity.py 
b/amber/src/main/python/core/util/virtual_identity.py
index 49da75fcd5..6893e7e8f0 100644
--- a/amber/src/main/python/core/util/virtual_identity.py
+++ b/amber/src/main/python/core/util/virtual_identity.py
@@ -24,16 +24,37 @@ from proto.org.apache.texera.amber.core import (
     ActorVirtualIdentity,
 )
 
-worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)")
+worker_name_pattern = re.compile(r"Worker:WF(\d+)-(.+)-(\w+)-(\d+)")
 
 MATERIALIZATION_READER_ACTOR_PREFIX = "MATERIALIZATION_READER_"
 
 
 def get_worker_index(worker_id: str) -> int:
-    match = worker_name_pattern.match(worker_id)
+    match = worker_name_pattern.fullmatch(worker_id)
     if match:
-        return int(match.group(2))
-    raise ValueError("Invalid worker ID format")
+        return int(match.group(4))
+    raise ValueError(f"Invalid worker ID format: {worker_id}")
+
+
+def get_logical_op_id(worker_id: str) -> str:
+    """
+    Extract the logical operator id from a worker actor name of the form
+    ``Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>``.
+
+    Returns the logical operator id only (the ``<operatorId>`` segment); the
+    physical operator id additionally carries the ``<layerName>``. Name
+    parallels Scala ``VirtualIdentityUtils.getLogicalOpId`` so the logical /
+    physical distinction is visible at every call site (the matching Scala
+    physical-id accessor is ``getPhysicalOpId``).
+
+    Unlike the Scala sibling (which returns a ``__DummyOperator`` sentinel
+    on a non-match), this raises ``ValueError`` so a malformed worker id
+    fails loudly rather than yielding a wrong id silently.
+    """
+    match = worker_name_pattern.fullmatch(worker_id)
+    if match:
+        return match.group(2)
+    raise ValueError(f"Invalid worker ID format: {worker_id}")
 
 
 def serialize_global_port_identity(obj: GlobalPortIdentity) -> str:
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala 
b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala
index af16044af2..d7c021524c 100644
--- a/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala
+++ b/amber/src/main/scala/org/apache/texera/amber/error/ErrorUtils.scala
@@ -110,7 +110,7 @@ object ErrorUtils {
     var operatorId = "unknown operator"
     var workerId = ""
     if (actorIdOpt.isDefined) {
-      operatorId = 
VirtualIdentityUtils.getPhysicalOpId(actorIdOpt.get).logicalOpId.id
+      operatorId = VirtualIdentityUtils.getLogicalOpId(actorIdOpt.get)
       workerId = actorIdOpt.get.name
     }
     (operatorId, workerId)
diff --git a/amber/src/test/python/core/util/test_virtual_identity.py 
b/amber/src/test/python/core/util/test_virtual_identity.py
index c2f6f63685..b94213c414 100644
--- a/amber/src/test/python/core/util/test_virtual_identity.py
+++ b/amber/src/test/python/core/util/test_virtual_identity.py
@@ -20,6 +20,7 @@ import pytest
 from core.util.virtual_identity import (
     deserialize_global_port_identity,
     get_from_actor_id_for_input_port_storage,
+    get_logical_op_id,
     get_worker_index,
     serialize_global_port_identity,
 )
@@ -75,6 +76,49 @@ class TestGetWorkerIndex:
         # greedy `.+` and breaks the trailing match surfaces here.
         assert get_worker_index("Worker:WF1-myOp-1st-physical-op-3") == 3
 
+    def test_raises_value_error_on_trailing_junk(self):
+        # fullmatch (not match) anchors the end of the string: a well-formed
+        # prefix followed by trailing junk must fail loudly. The old
+        # start-anchored match() would have silently returned 7 here.
+        with pytest.raises(ValueError, match="Invalid worker ID format"):
+            get_worker_index("Worker:WF1-myOp-main-7extra")
+
+
+class TestGetLogicalOpId:
+    def test_extracts_operator_id_from_canonical_name(self):
+        assert get_logical_op_id("Worker:WF1-myOp-main-0") == "myOp"
+
+    def test_isolates_operator_id_containing_hyphens(self):
+        # Load-bearing: operator ids contain dashes; greedy `.+` must still
+        # stop at the final <layer>-<index> tokens.
+        assert (
+            get_logical_op_id("Worker:WF12-PythonUDFV2-abc-def-main-0")
+            == "PythonUDFV2-abc-def"
+        )
+
+    def test_handles_non_main_layer_and_nonzero_index(self):
+        # The exact case the old `rsplit("-main-0")` got silently wrong.
+        assert get_logical_op_id("Worker:WF3-op-loopLayer-7") == "op"
+
+    def test_operator_id_ending_in_digits(self):
+        assert get_logical_op_id("Worker:WF1-op123-main-0") == "op123"
+
+    def test_raises_value_error_on_special_actor_id(self):
+        # Companions like CONTROLLER / SELF must fail loudly, not return junk.
+        with pytest.raises(ValueError, match="Invalid worker ID format"):
+            get_logical_op_id("CONTROLLER")
+
+    def test_raises_value_error_on_partial_match(self):
+        with pytest.raises(ValueError, match="Invalid worker ID format"):
+            get_logical_op_id("Worker:WF1-myOp-main")
+
+    def test_raises_value_error_on_trailing_junk(self):
+        # fullmatch anchors the end: a valid-looking prefix with trailing junk
+        # must fail loudly. The old start-anchored match() would have silently
+        # returned "myOp" here.
+        with pytest.raises(ValueError, match="Invalid worker ID format"):
+            get_logical_op_id("Worker:WF1-myOp-main-0extra")
+
 
 class TestSerializeGlobalPortIdentity:
     def test_emits_documented_format_for_canonical_input(self):
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
index 586d77a9c5..f91cca6af3 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
@@ -71,6 +71,26 @@ object VirtualIdentityUtils {
     }
   }
 
+  /**
+    * Extract the logical operator id from a worker actor id of the form
+    * `Worker:WF<workflowId>-<operatorId>-<layerName>-<workerIndex>`.
+    *
+    * Returns the logical operator id only (the `<operatorId>` segment);
+    * the physical operator id additionally carries the `<layerName>` and
+    * is exposed by [[getPhysicalOpId]]. Method name parallels
+    * `getPhysicalOpId` so callers can distinguish the two at the call
+    * site; the Python sibling is 
`core.util.virtual_identity.get_logical_op_id`.
+    *
+    * The Python helper raises `ValueError` on a non-match for fail-loud
+    * semantics; this Scala helper preserves the existing sentinel-on-miss
+    * behavior (`"__DummyOperator"`) so it stays a drop-in replacement for
+    * the inline `getPhysicalOpId(workerId).logicalOpId.id` pattern at
+    * call sites.
+    */
+  def getLogicalOpId(workerId: ActorVirtualIdentity): String = {
+    getPhysicalOpId(workerId).logicalOpId.id
+  }
+
   def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = {
     workerId.name match {
       case workerNamePattern(_, _, _, idx) =>
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
index 8ebf7dabec..58e58f68e4 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
@@ -94,6 +94,34 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with 
Matchers {
     }
   }
 
+  // ----- getLogicalOpId -----
+
+  "getLogicalOpId" should "return the logical operator id from a worker actor 
name" in {
+    val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-3")
+    VirtualIdentityUtils.getLogicalOpId(actor) shouldBe "myOp"
+  }
+
+  it should "match getPhysicalOpId(...).logicalOpId.id for worker actor names" 
in {
+    // Pin the helper as a thin wrapper — `getLogicalOpId(workerId)` and
+    // `getPhysicalOpId(workerId).logicalOpId.id` must always agree, so
+    // call sites that migrate to the helper are guaranteed to keep
+    // identical behavior.
+    val actor = ActorVirtualIdentity("Worker:WF1-multi-part-op-main-0")
+    VirtualIdentityUtils.getLogicalOpId(actor) shouldBe
+      VirtualIdentityUtils.getPhysicalOpId(actor).logicalOpId.id
+  }
+
+  it should "fall back to the __DummyOperator sentinel for non-worker actor 
names" in {
+    // The Python sibling raises ValueError on a non-match; the Scala
+    // helper preserves the existing __DummyOperator sentinel so it
+    // stays a drop-in replacement for the inline pattern at call sites
+    // (see VirtualIdentityUtils.getLogicalOpId docstring).
+    val controller = ActorVirtualIdentity("CONTROLLER")
+    VirtualIdentityUtils.getLogicalOpId(controller) shouldBe "__DummyOperator"
+    val self = ActorVirtualIdentity("SELF")
+    VirtualIdentityUtils.getLogicalOpId(self) shouldBe "__DummyOperator"
+  }
+
   // ----- getWorkerIndex -----
 
   "getWorkerIndex" should "return the trailing numeric workerId from a worker 
actor name" in {

Reply via email to