This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5952-7a38b6cf4c476e6f7ae0b0555fd711b50f4e85ec in repository https://gitbox.apache.org/repos/asf/texera.git
commit cc9eff7c6575f4d5eea38c183f0632d96e480cdc Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 26 03:06:58 2026 -0700 test(workflow-operator): add unit test coverage for Sklearn prediction/testing/linear-regression descriptors (#5952) ### What changes were proposed in this PR? Pin behavior of three previously-untested Sklearn model-lifecycle descriptors in `common/workflow-operator`. No production-code changes. Unlike the classifier family, these extend `PythonOperatorDescriptor` directly, so each has its own contract. | Spec | Source class | Tests | | --- | --- | --- | | `SklearnLinearRegressionOpDescSpec` | `SklearnLinearRegressionOpDesc` | 5 | | `SklearnPredictionOpDescSpec` | `SklearnPredictionOpDesc` | 5 | | `SklearnTestingOpDescSpec` | `SklearnTestingOpDesc` | 6 | **Behavior pinned** | Surface | Contract | | --- | --- | | `operatorInfo` | exact name/description; Sklearn group; correct multi-port shapes (LinearRegression training/testing + blocking output; Prediction model/data; Testing model/data) | | field defaults | LinearRegression `target` null / `degree` 1; Prediction `model`/`resultAttribute` null, `groundTruthAttribute` `""`; Testing `isRegression` false, `model`/`target` null | | `getOutputSchemas` | LinearRegression → `model_name`/`model`; Prediction → input (port 1) + result attribute; Testing → input (port 0) + classification (`accuracy`/`f1`/`precision`/`recall`) or regression (`R2`/`RMSE`/`MAE`) metric columns | | `generatePythonCode` | emits the expected estimator import / pipeline / scorer fragments | | Round-trip | config fields preserved through the polymorphic `LogicalOp` base, with the correct `operatorType` discriminator | ### Any related issues, documentation, discussions? Part of the ongoing `workflow-operator` unit-test coverage effort. ### How was this PR tested? - `sbt "WorkflowOperator/testOnly *SklearnLinearRegressionOpDescSpec *SklearnPredictionOpDescSpec *SklearnTestingOpDescSpec"` — 16 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/scalafixAll --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../SklearnLinearRegressionOpDescSpec.scala | 77 +++++++++++++++ .../sklearn/SklearnPredictionOpDescSpec.scala | 108 +++++++++++++++++++++ .../sklearn/testing/SklearnTestingOpDescSpec.scala | 98 +++++++++++++++++++ 3 files changed, 283 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/SklearnLinearRegressionOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/SklearnLinearRegressionOpDescSpec.scala new file mode 100644 index 0000000000..49d4156c05 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/SklearnLinearRegressionOpDescSpec.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.sklearn + +import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SklearnLinearRegressionOpDescSpec extends AnyFlatSpec with Matchers { + + "SklearnLinearRegressionOpDesc.operatorInfo" should + "advertise the model name, Sklearn group, and the training/testing port shape" in { + val info = (new SklearnLinearRegressionOpDesc).operatorInfo + info.userFriendlyName shouldBe "Linear Regression" + info.operatorDescription shouldBe "Sklearn Linear Regression Operator" + info.operatorGroupName shouldBe OperatorGroupConstants.SKLEARN_GROUP + info.inputPorts.map(_.displayName) shouldBe List("training", "testing") + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + "SklearnLinearRegressionOpDesc" should "default target to null and degree to 1" in { + val d = new SklearnLinearRegressionOpDesc + d.target shouldBe null + d.degree shouldBe 1 + } + + "SklearnLinearRegressionOpDesc.getOutputSchemas" should + "emit the model_name/model schema keyed by the declared output port" in { + val d = new SklearnLinearRegressionOpDesc + val schema = d.getOutputSchemas(Map.empty)(d.operatorInfo.outputPorts.head.id) + schema.getAttribute("model_name").getType shouldBe AttributeType.STRING + schema.getAttribute("model").getType shouldBe AttributeType.BINARY + } + + "SklearnLinearRegressionOpDesc.generatePythonCode" should + "build a polynomial LinearRegression pipeline" in { + val d = new SklearnLinearRegressionOpDesc + d.target = "y" + val code = d.generatePythonCode() + code should include("from sklearn.linear_model import LinearRegression") + code should include("from sklearn.preprocessing import PolynomialFeatures") + code should include("make_pipeline") + code should include("class ProcessTableOperator(UDFTableOperator)") + } + + "SklearnLinearRegressionOpDesc" should + "round-trip its target through the polymorphic base" in { + val d = new SklearnLinearRegressionOpDesc + d.target = "label" + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"SklearnLinearRegression\"") + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[SklearnLinearRegressionOpDesc] + restored.asInstanceOf[SklearnLinearRegressionOpDesc].target shouldBe "label" + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/SklearnPredictionOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/SklearnPredictionOpDescSpec.scala new file mode 100644 index 0000000000..2b5a76284a --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/SklearnPredictionOpDescSpec.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.sklearn + +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SklearnPredictionOpDescSpec extends AnyFlatSpec with Matchers { + + "SklearnPredictionOpDesc.operatorInfo" should + "advertise the name, Sklearn group, and a model/data 2-in 1-out shape" in { + val info = (new SklearnPredictionOpDesc).operatorInfo + info.userFriendlyName shouldBe "Sklearn Prediction" + info.operatorDescription shouldBe "Sklearn Prediction Operator" + info.operatorGroupName shouldBe OperatorGroupConstants.SKLEARN_GROUP + info.inputPorts should have length 2 + info.inputPorts.head.displayName shouldBe "model" + info.outputPorts should have length 1 + } + + "SklearnPredictionOpDesc" should "default its attribute fields" in { + val d = new SklearnPredictionOpDesc + d.model shouldBe null + d.resultAttribute shouldBe null + d.groundTruthAttribute shouldBe "" + } + + "SklearnPredictionOpDesc.getOutputSchemas" should + "append the result attribute to the data (port 1) schema" in { + val d = new SklearnPredictionOpDesc + d.resultAttribute = "prediction" + val data = Schema().add("feature", AttributeType.STRING) + val out = d.getOutputSchemas(Map(PortIdentity(1) -> data)) + val schema = out(d.operatorInfo.outputPorts.head.id) + schema.getAttribute("feature").getType shouldBe AttributeType.STRING + schema.getAttribute("prediction").getType shouldBe AttributeType.STRING + } + + it should "derive the result column type from the configured ground-truth column" in { + val d = new SklearnPredictionOpDesc + d.resultAttribute = "prediction" + d.groundTruthAttribute = "label" + val data = Schema() + .add("feature", AttributeType.STRING) + .add("label", AttributeType.INTEGER) + val out = d.getOutputSchemas(Map(PortIdentity(1) -> data)) + out(d.operatorInfo.outputPorts.head.id) + .getAttribute("prediction") + .getType shouldBe AttributeType.INTEGER + } + + it should "throw when the configured ground-truth attribute is absent from the input schema" in { + val d = new SklearnPredictionOpDesc + d.resultAttribute = "prediction" + d.groundTruthAttribute = "missing" + val data = Schema().add("feature", AttributeType.STRING) + intercept[NoSuchElementException] { + d.getOutputSchemas(Map(PortIdentity(1) -> data)) + } + } + + "SklearnPredictionOpDesc.generatePythonCode" should "emit the model-applying tuple operator" in { + val d = new SklearnPredictionOpDesc + d.model = "model" + d.resultAttribute = "prediction" + val code = d.generatePythonCode() + code should include("class ProcessTupleOperator(UDFOperatorV2)") + code should include("from sklearn.pipeline import Pipeline") + code should include(".predict(") + code should include("yield tuple_") + } + + "SklearnPredictionOpDesc" should + "round-trip its config fields through the polymorphic base" in { + val d = new SklearnPredictionOpDesc + d.model = "m" + d.resultAttribute = "p" + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"SklearnPrediction\"") + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[SklearnPredictionOpDesc] + val r = restored.asInstanceOf[SklearnPredictionOpDesc] + r.model shouldBe "m" + r.resultAttribute shouldBe "p" + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDescSpec.scala new file mode 100644 index 0000000000..8c93200500 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDescSpec.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.sklearn.testing + +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SklearnTestingOpDescSpec extends AnyFlatSpec with Matchers { + + "SklearnTestingOpDesc.operatorInfo" should + "advertise the name, Sklearn group, and a model/data 2-in 1-out shape" in { + val info = (new SklearnTestingOpDesc).operatorInfo + info.userFriendlyName shouldBe "Sklearn Testing" + info.operatorDescription shouldBe "It will generate scorers for Sklearn model" + info.operatorGroupName shouldBe OperatorGroupConstants.SKLEARN_GROUP + info.inputPorts.map(_.displayName) shouldBe List("model", "data") + info.outputPorts should have length 1 + } + + "SklearnTestingOpDesc" should "default isRegression false and the attribute fields to null" in { + val d = new SklearnTestingOpDesc + d.isRegression shouldBe false + d.model shouldBe null + d.target shouldBe null + } + + "SklearnTestingOpDesc.getOutputSchemas" should + "append the classification metric columns for the default (non-regression) case" in { + val d = new SklearnTestingOpDesc + val input = Schema().add("x", AttributeType.STRING) + val schema = + d.getOutputSchemas(Map(PortIdentity() -> input))(d.operatorInfo.outputPorts.head.id) + schema.getAttribute("x").getType shouldBe AttributeType.STRING + schema.getAttribute("accuracy").getType shouldBe AttributeType.DOUBLE + schema.getAttribute("f1").getType shouldBe AttributeType.DOUBLE + schema.getAttribute("precision").getType shouldBe AttributeType.DOUBLE + schema.getAttribute("recall").getType shouldBe AttributeType.DOUBLE + } + + it should "append the regression metric columns when isRegression is true" in { + val d = new SklearnTestingOpDesc + d.isRegression = true + val input = Schema().add("x", AttributeType.STRING) + val schema = + d.getOutputSchemas(Map(PortIdentity() -> input))(d.operatorInfo.outputPorts.head.id) + schema.getAttribute("R2").getType shouldBe AttributeType.DOUBLE + schema.getAttribute("RMSE").getType shouldBe AttributeType.DOUBLE + schema.getAttribute("MAE").getType shouldBe AttributeType.DOUBLE + } + + "SklearnTestingOpDesc.generatePythonCode" should "emit the scorer tuple operator" in { + val d = new SklearnTestingOpDesc + d.model = "model" + d.target = "y" + val code = d.generatePythonCode() + code should include("class ProcessTupleOperator(UDFOperatorV2)") + code should include("from sklearn.metrics import") + code should include(".predict(") + } + + "SklearnTestingOpDesc" should + "round-trip its config fields through the polymorphic base" in { + val d = new SklearnTestingOpDesc + d.isRegression = true + d.model = "m" + d.target = "t" + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"SklearnTesting\"") + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[SklearnTestingOpDesc] + val r = restored.asInstanceOf[SklearnTestingOpDesc] + r.isRegression shouldBe true + r.model shouldBe "m" + r.target shouldBe "t" + } +}
