This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new f5cf00a8b1 test(workflow-operator): add unit test coverage for 
map/filter/flatmap abstract LogicalOp bases (#5796)
f5cf00a8b1 is described below

commit f5cf00a8b1b7a91d18b2505b4b3de3dfcfdcbdd2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 19 22:56:58 2026 -0700

    test(workflow-operator): add unit test coverage for map/filter/flatmap 
abstract LogicalOp bases (#5796)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three tiny abstract `LogicalOp` bases in
    `common/workflow-operator/`. Two of them (`MapOpDesc`, `FilterOpDesc`)
    carry a `runtimeReconfiguration` override that delegates to
    `newOpDesc.getPhysicalOp` and wraps the result in `Success((_, None))` —
    drift in that delegation (swapped arg order, accidental `Failure` wrap,
    surprise `StateTransferFunc`) would land silently. No production-code
    changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `FlatMapOpDescSpec` | `FlatMapOpDesc` | 4 |
    | `MapOpDescSpec` | `MapOpDesc` | 6 |
    | `FilterOpDescSpec` | `FilterOpDesc` | 6 |
    
    All three spec files follow the `<srcClassName>Spec.scala` one-to-one
    convention.
    
    **Behavior pinned (per descriptor)**
    
    | Surface | Contract |
    | --- | --- |
    | Abstract-class shape | `Modifier.isAbstract(classOf[X])` |
    | LogicalOp inheritance | upcast compiles; `case _: LogicalOp` matches a
    concrete subclass |
    | `runtimeReconfiguration` (Map/Filter only) | delegates to
    `newOpDesc.getPhysicalOp(workflowId, executionId)` |
    | Argument isolation | `oldOpDesc.getPhysicalOp` is **not** called |
    | Result wrapping | returns `Success` with `(_, None)` for the
    `StateTransferFunc` slot |
    | Exception propagation | a throw from `newOpDesc.getPhysicalOp` is
    **not** caught — it propagates up |
    
    The specs use minimal `StubMapDesc` / `StubFilterDesc` subclasses that
    record every `getPhysicalOp` call so the delegation is observable
    end-to-end. Sentinel returns use `null.asInstanceOf[PhysicalOp]` — the
    production code only passes the return value to `Success(...)` without
    inspecting it, so the cast is safe for these tests.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5793.
    
    ### How was this PR tested?
    
    Pure unit-test additions; verified locally with:
    
    - `sbt \"WorkflowOperator/testOnly
    org.apache.texera.amber.operator.flatmap.FlatMapOpDescSpec
    org.apache.texera.amber.operator.map.MapOpDescSpec
    org.apache.texera.amber.operator.filter.FilterOpDescSpec\"` — 16 tests,
    all green
    - `sbt \"WorkflowOperator/Test/scalafmtCheck\"` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7 [1M context])
---
 .../amber/operator/filter/FilterOpDescSpec.scala   | 116 ++++++++++++++++++++
 .../amber/operator/flatmap/FlatMapOpDescSpec.scala |  78 ++++++++++++++
 .../texera/amber/operator/map/MapOpDescSpec.scala  | 120 +++++++++++++++++++++
 3 files changed, 314 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala
new file mode 100644
index 0000000000..5200eb3490
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpDescSpec.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.filter
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import java.lang.reflect.Modifier
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FilterOpDescSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Test-only concrete subclass — same shape as the MapOpDesc stub; the
+  // two abstract bases share the same `runtimeReconfiguration` contract.
+  // 
---------------------------------------------------------------------------
+
+  private class StubFilterDesc extends FilterOpDesc {
+    var calls: List[(WorkflowIdentity, ExecutionIdentity)] = Nil
+    override def getPhysicalOp(
+        workflowId: WorkflowIdentity,
+        executionId: ExecutionIdentity
+    ): PhysicalOp = {
+      calls = calls :+ ((workflowId, executionId))
+      null.asInstanceOf[PhysicalOp]
+    }
+    override def operatorInfo: OperatorInfo =
+      OperatorInfo(
+        "StubFilter",
+        "stub filter",
+        OperatorGroupConstants.UTILITY_GROUP,
+        inputPorts = List.empty,
+        outputPorts = List.empty
+      )
+  }
+
+  private val workflowId = WorkflowIdentity(7L)
+  private val executionId = ExecutionIdentity(11L)
+
+  // 
---------------------------------------------------------------------------
+  // Abstract-class shape
+  // 
---------------------------------------------------------------------------
+
+  "FilterOpDesc" should "be declared abstract (cannot be instantiated 
directly)" in {
+    assert(Modifier.isAbstract(classOf[FilterOpDesc].getModifiers))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Inheritance — FilterOpDesc is a LogicalOp
+  // 
---------------------------------------------------------------------------
+
+  it should "extend LogicalOp (compile-time enforced)" in {
+    val s: LogicalOp = new StubFilterDesc
+    assert(s != null)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // runtimeReconfiguration — delegates to newOpDesc and ignores oldOpDesc
+  // 
---------------------------------------------------------------------------
+
+  "FilterOpDesc.runtimeReconfiguration" should
+    "delegate to newOpDesc.getPhysicalOp with the supplied workflow/execution 
ids" in {
+    val oldDesc = new StubFilterDesc
+    val newDesc = new StubFilterDesc
+    val result = oldDesc.runtimeReconfiguration(workflowId, executionId, 
oldDesc, newDesc)
+    assert(result.isSuccess)
+    assert(newDesc.calls == List((workflowId, executionId)))
+  }
+
+  it should "not call oldOpDesc.getPhysicalOp" in {
+    val oldDesc = new StubFilterDesc
+    val newDesc = new StubFilterDesc
+    oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc)
+    assert(oldDesc.calls == Nil)
+  }
+
+  it should "return None for the StateTransferFunc slot" in {
+    val oldDesc = new StubFilterDesc
+    val newDesc = new StubFilterDesc
+    val result = oldDesc.runtimeReconfiguration(workflowId, executionId, 
oldDesc, newDesc)
+    val (_, transferOpt) = result.get
+    assert(transferOpt.isEmpty)
+  }
+
+  it should "propagate exceptions from newOpDesc.getPhysicalOp (not catch 
them)" in {
+    val oldDesc = new StubFilterDesc
+    val throwingDesc = new StubFilterDesc {
+      override def getPhysicalOp(
+          workflowId: WorkflowIdentity,
+          executionId: ExecutionIdentity
+      ): PhysicalOp = throw new RuntimeException("sentinel:newDesc")
+    }
+    val ex = intercept[RuntimeException] {
+      oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, 
throwingDesc)
+    }
+    assert(ex.getMessage == "sentinel:newDesc")
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala
new file mode 100644
index 0000000000..937c7a4c64
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpDescSpec.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.flatmap
+
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import java.lang.reflect.Modifier
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FlatMapOpDescSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Test-only concrete subclass
+  // 
---------------------------------------------------------------------------
+
+  private class StubFlatMap extends FlatMapOpDesc {
+    override def operatorInfo: OperatorInfo =
+      OperatorInfo(
+        "StubFlatMap",
+        "stub flatmap",
+        OperatorGroupConstants.UTILITY_GROUP,
+        inputPorts = List.empty,
+        outputPorts = List.empty
+      )
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Abstract-class shape
+  // 
---------------------------------------------------------------------------
+
+  "FlatMapOpDesc" should "be declared abstract (cannot be instantiated 
directly)" in {
+    assert(Modifier.isAbstract(classOf[FlatMapOpDesc].getModifiers))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Inheritance — FlatMapOpDesc is a LogicalOp
+  // 
---------------------------------------------------------------------------
+
+  it should "extend LogicalOp (compile-time enforced)" in {
+    val s: LogicalOp = new StubFlatMap
+    assert(s != null)
+  }
+
+  it should "match the LogicalOp type-pattern" in {
+    val any: AnyRef = new StubFlatMap
+    val matched = any match {
+      case _: LogicalOp => true
+      case _            => false
+    }
+    assert(matched)
+  }
+
+  it should "match the FlatMapOpDesc type-pattern" in {
+    val any: AnyRef = new StubFlatMap
+    val matched = any match {
+      case _: FlatMapOpDesc => true
+      case _                => false
+    }
+    assert(matched)
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala
new file mode 100644
index 0000000000..aa61154ca8
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpDescSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.map
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import java.lang.reflect.Modifier
+import org.scalatest.flatspec.AnyFlatSpec
+
+class MapOpDescSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Test-only concrete subclass — records every call to `getPhysicalOp`
+  // and returns a stable sentinel so the spec can verify the delegation.
+  //
+  // Casting `null` to PhysicalOp is acceptable here: the production
+  // `runtimeReconfiguration` just wraps the return value in
+  // `Success((_, None))` without inspecting it.
+  // 
---------------------------------------------------------------------------
+
+  private class StubMapDesc extends MapOpDesc {
+    var calls: List[(WorkflowIdentity, ExecutionIdentity)] = Nil
+    override def getPhysicalOp(
+        workflowId: WorkflowIdentity,
+        executionId: ExecutionIdentity
+    ): PhysicalOp = {
+      calls = calls :+ ((workflowId, executionId))
+      null.asInstanceOf[PhysicalOp]
+    }
+    override def operatorInfo: OperatorInfo =
+      OperatorInfo(
+        "StubMap",
+        "stub map",
+        OperatorGroupConstants.UTILITY_GROUP,
+        inputPorts = List.empty,
+        outputPorts = List.empty
+      )
+  }
+
+  private val workflowId = WorkflowIdentity(7L)
+  private val executionId = ExecutionIdentity(11L)
+
+  // 
---------------------------------------------------------------------------
+  // Abstract-class shape
+  // 
---------------------------------------------------------------------------
+
+  "MapOpDesc" should "be declared abstract (cannot be instantiated directly)" 
in {
+    assert(Modifier.isAbstract(classOf[MapOpDesc].getModifiers))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Inheritance — MapOpDesc is a LogicalOp
+  // 
---------------------------------------------------------------------------
+
+  it should "extend LogicalOp (compile-time enforced)" in {
+    val s: LogicalOp = new StubMapDesc
+    assert(s != null)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // runtimeReconfiguration — delegates to newOpDesc and ignores oldOpDesc
+  // 
---------------------------------------------------------------------------
+
+  "MapOpDesc.runtimeReconfiguration" should
+    "delegate to newOpDesc.getPhysicalOp with the supplied workflow/execution 
ids" in {
+    val oldDesc = new StubMapDesc
+    val newDesc = new StubMapDesc
+    val result = oldDesc.runtimeReconfiguration(workflowId, executionId, 
oldDesc, newDesc)
+    assert(result.isSuccess)
+    assert(newDesc.calls == List((workflowId, executionId)))
+  }
+
+  it should "not call oldOpDesc.getPhysicalOp" in {
+    val oldDesc = new StubMapDesc
+    val newDesc = new StubMapDesc
+    oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, newDesc)
+    assert(oldDesc.calls == Nil)
+  }
+
+  it should "return None for the StateTransferFunc slot" in {
+    val oldDesc = new StubMapDesc
+    val newDesc = new StubMapDesc
+    val result = oldDesc.runtimeReconfiguration(workflowId, executionId, 
oldDesc, newDesc)
+    val (_, transferOpt) = result.get
+    assert(transferOpt.isEmpty)
+  }
+
+  it should "propagate exceptions from newOpDesc.getPhysicalOp (not catch 
them)" in {
+    val oldDesc = new StubMapDesc
+    val throwingDesc = new StubMapDesc {
+      override def getPhysicalOp(
+          workflowId: WorkflowIdentity,
+          executionId: ExecutionIdentity
+      ): PhysicalOp = throw new RuntimeException("sentinel:newDesc")
+    }
+    val ex = intercept[RuntimeException] {
+      oldDesc.runtimeReconfiguration(workflowId, executionId, oldDesc, 
throwingDesc)
+    }
+    assert(ex.getMessage == "sentinel:newDesc")
+  }
+}

Reply via email to