This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5953-18a24e51e60835388e8bec8847520470c01ff55d in repository https://gitbox.apache.org/repos/asf/texera.git
commit ad908b74857b86f4bc6087b61fcbf9a54f738edb Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 26 01:35:28 2026 -0700 test(workflow-operator): add unit test coverage for relational join descriptors (CartesianProduct, HashJoin) (#5953) ### What changes were proposed in this PR? Pin behavior of the two previously-untested relational join descriptors in `common/workflow-operator`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `CartesianProductOpDescSpec` | `CartesianProductOpDesc` | 5 | | `HashJoinOpDescSpec` | `HashJoinOpDesc` | 5 | **Behavior pinned** | Surface | Contract | | --- | --- | | `operatorInfo` | exact name/description; Join group; left/right 2-in, 1-out shape | | field defaults | HashJoin `buildAttributeName`/`probeAttributeName` null, `joinType` `INNER` | | schema propagation | CartesianProduct concatenates left + right, renaming right-side duplicates (`k` → `k#@1`); HashJoin drops the probe key, keeps the build key, and renames retained right-side clashes | | `getPhysicalOp` | CartesianProduct wires `OpExecWithClassName(...CartesianProductOpExec)` with the correct ports | | Round-trip | config fields preserved through the polymorphic `LogicalOp` base, with the correct `operatorType` discriminator | ### Any related issues, documentation, discussions? Part of the ongoing `workflow-operator` unit-test coverage effort. ### How was this PR tested? - `sbt "WorkflowOperator/testOnly *CartesianProductOpDescSpec *HashJoinOpDescSpec"` — 10 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/scalafixAll --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../CartesianProductOpDescSpec.scala | 92 ++++++++++++++++++++++ .../operator/hashJoin/HashJoinOpDescSpec.scala | 92 ++++++++++++++++++++++ 2 files changed, 184 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/cartesianProduct/CartesianProductOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/cartesianProduct/CartesianProductOpDescSpec.scala new file mode 100644 index 0000000000..9d8d3d7964 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/cartesianProduct/CartesianProductOpDescSpec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.cartesianProduct + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class CartesianProductOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "CartesianProductOpDesc.operatorInfo" should + "advertise the Cartesian Product in the Join group with a left/right 2-in 1-out shape" in { + val info = (new CartesianProductOpDesc).operatorInfo + info.userFriendlyName shouldBe "Cartesian Product" + info.operatorDescription shouldBe + "Append fields together to get the cartesian product of two inputs" + info.operatorGroupName shouldBe OperatorGroupConstants.JOIN_GROUP + info.inputPorts.map(_.displayName) shouldBe List("left", "right") + info.outputPorts should have length 1 + } + + "CartesianProductOpDesc.getExternalOutputSchemas" should + "concatenate the two input schemas, renaming right-side duplicates" in { + val d = new CartesianProductOpDesc + val left = Schema() + .add(new Attribute("a", AttributeType.STRING)) + .add(new Attribute("k", AttributeType.LONG)) + val right = Schema() + .add(new Attribute("b", AttributeType.STRING)) + .add(new Attribute("k", AttributeType.LONG)) + val out = d.getExternalOutputSchemas( + Map(PortIdentity() -> left, PortIdentity(1) -> right) + ) + out(d.operatorInfo.outputPorts.head.id).getAttributeNames shouldBe List("a", "k", "b", "k#@1") + } + + it should "keep both schemas intact when there are no name clashes" in { + val d = new CartesianProductOpDesc + val left = Schema().add(new Attribute("a", AttributeType.STRING)) + val right = Schema().add(new Attribute("b", AttributeType.STRING)) + val out = d.getExternalOutputSchemas( + Map(PortIdentity() -> left, PortIdentity(1) -> right) + ) + out(d.operatorInfo.outputPorts.head.id).getAttributeNames shouldBe List("a", "b") + } + + "CartesianProductOpDesc.getPhysicalOp" should + "wire the Cartesian Product exec with two input ports and one output port" in { + val d = new CartesianProductOpDesc + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.cartesianProduct.CartesianProductOpExec" + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + "CartesianProductOpDesc" should "round-trip through the polymorphic base" in { + val d = new CartesianProductOpDesc + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"CartesianProduct\"") + objectMapper.readValue(json, classOf[LogicalOp]) shouldBe a[CartesianProductOpDesc] + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/HashJoinOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/HashJoinOpDescSpec.scala new file mode 100644 index 0000000000..4ead90b645 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/HashJoinOpDescSpec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.hashJoin + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.workflow.PortIdentity +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class HashJoinOpDescSpec extends AnyFlatSpec with Matchers { + + private def leftRight(): (Schema, Schema) = + ( + Schema() + .add(new Attribute("a", AttributeType.STRING)) + .add(new Attribute("k", AttributeType.LONG)), + Schema() + .add(new Attribute("b", AttributeType.STRING)) + .add(new Attribute("k", AttributeType.LONG)) + ) + + "HashJoinOpDesc.operatorInfo" should + "advertise the Hash Join in the Join group with a left/right 2-in 1-out shape" in { + val info = (new HashJoinOpDesc[String]).operatorInfo + info.userFriendlyName shouldBe "Hash Join" + info.operatorDescription shouldBe "join two inputs" + info.operatorGroupName shouldBe OperatorGroupConstants.JOIN_GROUP + info.inputPorts.map(_.displayName) shouldBe List("left", "right") + info.outputPorts should have length 1 + } + + "HashJoinOpDesc" should "default the join keys to null and the join type to inner" in { + val d = new HashJoinOpDesc[String] + d.buildAttributeName shouldBe null + d.probeAttributeName shouldBe null + d.joinType shouldBe JoinType.INNER + } + + "HashJoinOpDesc.getExternalOutputSchemas" should + "drop the probe key and keep the build key when join columns share a name" in { + val d = new HashJoinOpDesc[String] + d.buildAttributeName = "k" + d.probeAttributeName = "k" + val (left, right) = leftRight() + val out = d.getExternalOutputSchemas(Map(PortIdentity() -> left, PortIdentity(1) -> right)) + out(d.operatorInfo.outputPorts.head.id).getAttributeNames shouldBe List("a", "k", "b") + } + + it should "rename a retained right-side column that clashes with a left-side name" in { + val d = new HashJoinOpDesc[String] + d.buildAttributeName = "k" + d.probeAttributeName = "b" + val (left, right) = leftRight() + val out = d.getExternalOutputSchemas(Map(PortIdentity() -> left, PortIdentity(1) -> right)) + out(d.operatorInfo.outputPorts.head.id).getAttributeNames shouldBe List("a", "k", "k#@1") + } + + "HashJoinOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new HashJoinOpDesc[String] + d.buildAttributeName = "lk" + d.probeAttributeName = "rk" + d.joinType = JoinType.LEFT_OUTER + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"HashJoin\"") + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[HashJoinOpDesc[_]] + val r = restored.asInstanceOf[HashJoinOpDesc[String]] + r.buildAttributeName shouldBe "lk" + r.probeAttributeName shouldBe "rk" + r.joinType shouldBe JoinType.LEFT_OUTER + } +}
