This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5876-3ced705c721574a06ce7031019fdbbb5ce22106e in repository https://gitbox.apache.org/repos/asf/texera.git
commit 0f98a140714f87059458e06a76234cf4a5ec98b0 Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Jun 21 23:46:32 2026 -0700 test(workflow-operator): add unit test coverage for Python UDF operator descriptors (#5876) ### What changes were proposed in this PR? Pin behavior of three previously-untested Python-UDF descriptors in `common/workflow-operator/udf/python/`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `PythonUDFOpDescV2Spec` | `PythonUDFOpDescV2` | 7 | | `PythonUDFSourceOpDescV2Spec` | `PythonUDFSourceOpDescV2` | 5 | | `PythonTableReducerOpDescSpec` | `PythonTableReducerOpDesc` | 6 | **Behavior pinned** | Surface | Contract | | --- | --- | | `operatorInfo` | exact name + `PYTHON_GROUP`; `PythonUDFOpDescV2` dynamic 1-in/1-out; `PythonUDFSourceOpDescV2` zero inputs / one output + `supportReconfiguration` | | `getPhysicalOp` | wires `OpExecWithCode(code, "python")`; port identities carried; rejects `workers <= 0` (`IllegalArgumentException`) | | Schema propagation | `PythonUDFOpDescV2`: `retainInputColumns=false` → only the output columns, `true` → input + output columns (full map keyed by the declared output port); `PythonUDFSourceOpDescV2`: `sourceSchema()` from the `columns` field; `PythonTableReducerOpDesc`: `getOutputSchemas` folds the lambda units and rejects the empty default | | Round-trip | config fields preserved through the polymorphic base | ### Any related issues, documentation, discussions? Closes #5873. ### How was this PR tested? - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.udf.python.PythonUDFOpDescV2Spec org.apache.texera.amber.operator.udf.python.source.PythonUDFSourceOpDescV2Spec org.apache.texera.amber.operator.udf.python.PythonTableReducerOpDescSpec"` — 18 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/Test/scalafix --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../udf/python/PythonTableReducerOpDescSpec.scala | 83 +++++++++++++ .../udf/python/PythonUDFOpDescV2Spec.scala | 137 +++++++++++++++++++++ .../source/PythonUDFSourceOpDescV2Spec.scala | 89 +++++++++++++ 3 files changed, 309 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala new file mode 100644 index 0000000000..ba1a7a20ff --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonTableReducerOpDescSpec extends AnyFlatSpec with Matchers { + + private def unit(name: String, expr: String, t: AttributeType): LambdaAttributeUnit = + new LambdaAttributeUnit(name, expr, null, t) + + "PythonTableReducerOpDesc.operatorInfo" should "advertise the name and Python group" in { + val info = (new PythonTableReducerOpDesc).operatorInfo + info.userFriendlyName shouldBe "Python Table Reducer" + info.operatorDescription shouldBe "Reduce Table to Tuple" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "PythonTableReducerOpDesc" should "default lambdaAttributeUnits to an empty list" in { + (new PythonTableReducerOpDesc).lambdaAttributeUnits shouldBe empty + } + + "PythonTableReducerOpDesc.getOutputSchemas" should + "fold each lambda unit into an output column keyed by the declared output port" in { + val d = new PythonTableReducerOpDesc + d.lambdaAttributeUnits = List(unit("score", "1 + 1", AttributeType.INTEGER)) + d.getOutputSchemas(Map.empty) shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add("score", AttributeType.INTEGER) + ) + } + + it should "reject an empty lambda list" in { + intercept[IllegalArgumentException] { + (new PythonTableReducerOpDesc).getOutputSchemas(Map.empty) + } + } + + "PythonTableReducerOpDesc.generatePythonCode" should "emit the reducer table operator" in { + val d = new PythonTableReducerOpDesc + d.lambdaAttributeUnits = List(unit("score", "1 + 1", AttributeType.INTEGER)) + val code = d.generatePythonCode() + code should include("class ProcessTableOperator(UDFTableOperator)") + code should include("score") + } + + "PythonTableReducerOpDesc" should "round-trip its lambda units through the polymorphic base" in { + val d = new PythonTableReducerOpDesc + d.lambdaAttributeUnits = + List(new LambdaAttributeUnit("score", "1 + 1", "scoreOut", AttributeType.INTEGER)) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[PythonTableReducerOpDesc] + val r = restored.asInstanceOf[PythonTableReducerOpDesc] + r.lambdaAttributeUnits should have length 1 + r.lambdaAttributeUnits.head.attributeName shouldBe "score" + r.lambdaAttributeUnits.head.expression shouldBe "1 + 1" + r.lambdaAttributeUnits.head.newAttributeName shouldBe "scoreOut" + r.lambdaAttributeUnits.head.attributeType shouldBe AttributeType.INTEGER + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala new file mode 100644 index 0000000000..be319f4e41 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "PythonUDFOpDescV2.operatorInfo" should + "advertise the name, Python group, dynamic ports, and a default 1-in/1-out shape" in { + val info = (new PythonUDFOpDescV2).operatorInfo + info.userFriendlyName shouldBe "Python UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.dynamicInputPorts shouldBe true + info.dynamicOutputPorts shouldBe true + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "PythonUDFOpDescV2" should "default code/workers/flags" in { + val d = new PythonUDFOpDescV2 + d.code shouldBe "" + d.workers shouldBe 1 + d.retainInputColumns shouldBe false + d.defaultEnv shouldBe true + } + + "PythonUDFOpDescV2.getPhysicalOp" should + "wire OpExecWithCode(code, \"python\") and carry port identities" in { + val d = new PythonUDFOpDescV2 + d.code = "yield t" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code shouldBe "yield t" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new PythonUDFOpDescV2 + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + it should "reject a blank virtual-environment name when the default env is disabled" in { + val d = new PythonUDFOpDescV2 + d.defaultEnv = false + d.envName = " " + intercept[RuntimeException] { d.getPhysicalOp(workflowId, executionId) } + } + + "PythonUDFOpDescV2 schema propagation" should + "emit only the output columns when input columns are not retained (default)" in { + val d = new PythonUDFOpDescV2 + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) + out shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add( + new Attribute("res", AttributeType.INTEGER) + ) + ) + } + + it should "retain input columns plus the output columns when retainInputColumns is true" in { + val d = new PythonUDFOpDescV2 + d.retainInputColumns = true + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) + out shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema() + .add(new Attribute("in", AttributeType.STRING)) + .add(new Attribute("res", AttributeType.INTEGER)) + ) + } + + it should "reject an output column that collides with a retained input column" in { + val d = new PythonUDFOpDescV2 + d.retainInputColumns = true + d.outputColumns = List(new Attribute("dup", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("dup", AttributeType.STRING)) + intercept[RuntimeException] { + d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) + } + } + + "PythonUDFOpDescV2" should "round-trip its config fields through the polymorphic base" in { + val d = new PythonUDFOpDescV2 + d.code = "print(1)" + d.workers = 3 + d.retainInputColumns = true + d.defaultEnv = false + d.envName = "myenv" + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[PythonUDFOpDescV2] + val p = restored.asInstanceOf[PythonUDFOpDescV2] + p.code shouldBe "print(1)" + p.workers shouldBe 3 + p.retainInputColumns shouldBe true + p.defaultEnv shouldBe false + p.envName shouldBe "myenv" + p.outputColumns shouldBe List(new Attribute("res", AttributeType.INTEGER)) + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala new file mode 100644 index 0000000000..f0c3dfa41f --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python.source + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUDFSourceOpDescV2Spec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "PythonUDFSourceOpDescV2.operatorInfo" should + "advertise the 1-out Python UDF source (no inputs, one output, reconfigurable)" in { + val info = (new PythonUDFSourceOpDescV2).operatorInfo + info.userFriendlyName shouldBe "1-out Python UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + info.supportReconfiguration shouldBe true + } + + "PythonUDFSourceOpDescV2.sourceSchema" should "be empty by default and reflect the configured columns" in { + (new PythonUDFSourceOpDescV2).sourceSchema().getAttributes shouldBe empty + val d = new PythonUDFSourceOpDescV2 + d.columns = List(new Attribute("a", AttributeType.STRING)) + d.sourceSchema() shouldBe Schema().add(new Attribute("a", AttributeType.STRING)) + } + + "PythonUDFSourceOpDescV2.getPhysicalOp" should + "wire OpExecWithCode(code, \"python\") as a source op with one output port" in { + val d = new PythonUDFSourceOpDescV2 + d.code = "yield {'a': 1}" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code shouldBe "yield {'a': 1}" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new PythonUDFSourceOpDescV2 + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "PythonUDFSourceOpDescV2" should "round-trip its config fields through the polymorphic base" in { + val d = new PythonUDFSourceOpDescV2 + d.code = "yield" + d.workers = 2 + d.defaultEnv = false + d.envName = "venv" + d.columns = List(new Attribute("a", AttributeType.STRING)) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[PythonUDFSourceOpDescV2] + val p = restored.asInstanceOf[PythonUDFSourceOpDescV2] + p.code shouldBe "yield" + p.workers shouldBe 2 + p.defaultEnv shouldBe false + p.envName shouldBe "venv" + p.columns shouldBe List(new Attribute("a", AttributeType.STRING)) + } +}
