This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 58824343df test(workflow-operator): add unit test coverage for 
IntervalJoin, JoinUtils, and OperatorGroupConstants (#5845)
58824343df is described below

commit 58824343dfd0f25ba9b79f34017743d6b3cdcdf3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 20 23:49:49 2026 -0700

    test(workflow-operator): add unit test coverage for IntervalJoin, 
JoinUtils, and OperatorGroupConstants (#5845)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three join/utility classes in
    `common/workflow-operator/`. No production-code changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `IntervalJoinOpDescSpec` | `IntervalJoinOpDesc` | 5 |
    | `JoinUtilsSpec` | `JoinUtils` (object) | 3 |
    | `OperatorGroupConstantsSpec` | `OperatorGroupConstants` (object) | 4 |
    
    **Behavior pinned — `IntervalJoinOpDesc`**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | `Interval Join`, JOIN_GROUP; two ordered inputs
    (`left table` at `PortIdentity()`, `right table` at `PortIdentity(1)`
    depending on the left); one output |
    | Field defaults | join-key attrs `null`; `constant == 10`;
    `includeLeftBound`/`includeRightBound == true` |
    | `getPhysicalOp` | wires `IntervalJoinOpExec`; port identities carried;
    `HashPartition` requirement on each join key |
    | Schema propagation | merges left ⧺ right schemas, suffixing a
    conflicting attribute with `#@1` |
    
    **Behavior pinned — `JoinUtils.joinTuples`**
    
    | Surface | Contract |
    | --- | --- |
    | Concatenation | left + right fields merged |
    | Skip | the named join-key attribute is dropped from the right side |
    | Conflict rename | a right-side name collision is renamed with a `#@1`
    suffix |
    
    **Behavior pinned — `OperatorGroupConstants`**
    
    | Surface | Contract |
    | --- | --- |
    | Constant values | the canonical group-name strings (`Data Input`,
    `Join`, `Set`, … `Control Block`) |
    | `OperatorGroupOrderList` | starts at `Data Input`, ends at `Control
    Block`; relational subgroups (Join/Set/Aggregate/Sort) nested under
    `Data Cleaning`; visualization subgroups nested under `Visualization` in
    panel order |
    
    ### Any related issues, documentation, discussions?
    
    Closes #5840.
    
    ### How was this PR tested?
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDescSpec
    org.apache.texera.amber.operator.hashJoin.JoinUtilsSpec
    org.apache.texera.amber.operator.metadata.OperatorGroupConstantsSpec"` —
    12 tests, all green
    - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
    "WorkflowOperator/Test/scalafix --check"` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.8 [1M context])
---
 .../amber/operator/hashJoin/JoinUtilsSpec.scala    |  93 ++++++++++++++++
 .../intervalJoin/IntervalJoinOpDescSpec.scala      | 121 +++++++++++++++++++++
 .../metadata/OperatorGroupConstantsSpec.scala      |  92 ++++++++++++++++
 3 files changed, 306 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala
new file mode 100644
index 0000000000..04e2af16f8
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/hashJoin/JoinUtilsSpec.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.hashJoin
+
+import org.apache.texera.amber.core.tuple.{
+  Attribute,
+  AttributeType,
+  Schema,
+  SchemaEnforceable,
+  Tuple
+}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class JoinUtilsSpec extends AnyFlatSpec with Matchers {
+
+  private def schemaOf(attrs: (String, AttributeType)*): Schema =
+    attrs.foldLeft(Schema())((s, a) => s.add(new Attribute(a._1, a._2)))
+
+  private def tupleOf(schema: Schema, values: (String, Any)*): Tuple = {
+    val b = Tuple.builder(schema)
+    values.foreach { case (name, v) => b.add(schema.getAttribute(name), v) }
+    b.build()
+  }
+
+  "JoinUtils.joinTuples" should "concatenate the left and right tuple fields" 
in {
+    val leftSchema = schemaOf("a" -> AttributeType.STRING, "b" -> 
AttributeType.INTEGER)
+    val rightSchema = schemaOf("c" -> AttributeType.STRING)
+    val joined = JoinUtils.joinTuples(
+      tupleOf(leftSchema, "a" -> "av", "b" -> Integer.valueOf(1)),
+      tupleOf(rightSchema, "c" -> "cv")
+    )
+    joined.getFields.length shouldBe 3
+    val enforced = joined
+      .asInstanceOf[SchemaEnforceable]
+      .enforceSchema(
+        schemaOf(
+          "a" -> AttributeType.STRING,
+          "b" -> AttributeType.INTEGER,
+          "c" -> AttributeType.STRING
+        )
+      )
+    enforced.getField[String]("a") shouldBe "av"
+    enforced.getField[String]("c") shouldBe "cv"
+  }
+
+  it should "skip the named attribute (the join key) from the right tuple" in {
+    val leftSchema = schemaOf("a" -> AttributeType.STRING)
+    val rightSchema = schemaOf("k" -> AttributeType.STRING, "c" -> 
AttributeType.STRING)
+    val joined = JoinUtils.joinTuples(
+      tupleOf(leftSchema, "a" -> "av"),
+      tupleOf(rightSchema, "k" -> "kv", "c" -> "cv"),
+      skipAttributeName = Some("k")
+    )
+    joined.getFields.length shouldBe 2
+    val enforced = joined
+      .asInstanceOf[SchemaEnforceable]
+      .enforceSchema(schemaOf("a" -> AttributeType.STRING, "c" -> 
AttributeType.STRING))
+    enforced.getField[String]("a") shouldBe "av"
+    enforced.getField[String]("c") shouldBe "cv"
+  }
+
+  it should "rename a right-side name conflict with a #@1 suffix" in {
+    val schema = schemaOf("x" -> AttributeType.STRING)
+    val joined = JoinUtils.joinTuples(
+      tupleOf(schema, "x" -> "L"),
+      tupleOf(schema, "x" -> "R")
+    )
+    joined.getFields.length shouldBe 2
+    val enforced = joined
+      .asInstanceOf[SchemaEnforceable]
+      .enforceSchema(schemaOf("x" -> AttributeType.STRING, "x#@1" -> 
AttributeType.STRING))
+    enforced.getField[String]("x") shouldBe "L"
+    enforced.getField[String]("x#@1") shouldBe "R"
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala
new file mode 100644
index 0000000000..01d9d2e10b
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalJoinOpDescSpec.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.intervalJoin
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, PortIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class IntervalJoinOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  // Set the join keys + a concrete (non-null) Option for timeIntervalType 
before any
+  // path that serializes `this` (getPhysicalOp / round-trip).
+  private def configured(): IntervalJoinOpDesc = {
+    val d = new IntervalJoinOpDesc
+    d.leftAttributeName = "lk"
+    d.rightAttributeName = "rk"
+    d.timeIntervalType = None
+    d
+  }
+
+  "IntervalJoinOpDesc.operatorInfo" should
+    "advertise two ordered inputs (left then right) in the Join group" in {
+    val info = (new IntervalJoinOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Interval Join"
+    info.operatorGroupName shouldBe OperatorGroupConstants.JOIN_GROUP
+    info.inputPorts should have length 2
+    info.inputPorts.head.id shouldBe PortIdentity()
+    info.inputPorts.head.displayName shouldBe "left table"
+    info.inputPorts.last.id shouldBe PortIdentity(1)
+    info.inputPorts.last.displayName shouldBe "right table"
+    info.inputPorts.last.dependencies shouldBe List(PortIdentity(0))
+    info.outputPorts should have length 1
+  }
+
+  "IntervalJoinOpDesc" should
+    "default the join-key attributes to null and the bounds/constant to their 
defaults" in {
+    val d = new IntervalJoinOpDesc
+    d.leftAttributeName shouldBe null
+    d.rightAttributeName shouldBe null
+    d.constant shouldBe 10L
+    d.includeLeftBound shouldBe true
+    d.includeRightBound shouldBe true
+  }
+
+  "IntervalJoinOpDesc.getPhysicalOp" should
+    "wire IntervalJoinOpExec, carry port identities, and require HashPartition 
on each join key" in {
+    val op = configured()
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe 
"org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpExec"
+        descString should not be empty
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
op.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
op.operatorInfo.outputPorts.map(_.id).toSet
+    physical.partitionRequirement shouldBe List(
+      Option(HashPartition(List("lk"))),
+      Option(HashPartition(List("rk")))
+    )
+  }
+
+  "IntervalJoinOpDesc schema propagation" should
+    "merge the left and right schemas, suffixing a conflicting attribute with 
#@1" in {
+    val op = configured()
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    val leftSchema = Schema()
+      .add(new Attribute("a", AttributeType.STRING))
+      .add(new Attribute("k", AttributeType.LONG))
+    val rightSchema = Schema()
+      .add(new Attribute("b", AttributeType.STRING))
+      .add(new Attribute("k", AttributeType.LONG))
+    val out = physical.propagateSchema.func(
+      Map(PortIdentity() -> leftSchema, PortIdentity(1) -> rightSchema)
+    )
+    out.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet
+    out(op.operatorInfo.outputPorts.head.id).getAttributes.map(_.getName) 
shouldBe
+      List("a", "k", "b", "k#@1")
+  }
+
+  "IntervalJoinOpDesc" should "round-trip its fields through the polymorphic 
base" in {
+    val d = configured()
+    d.constant = 42L
+    d.includeLeftBound = false
+    d.includeRightBound = false
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[IntervalJoinOpDesc]
+    val ij = restored.asInstanceOf[IntervalJoinOpDesc]
+    ij.leftAttributeName shouldBe "lk"
+    ij.rightAttributeName shouldBe "rk"
+    ij.constant shouldBe 42L
+    ij.includeLeftBound shouldBe false
+    ij.includeRightBound shouldBe false
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala
new file mode 100644
index 0000000000..94ef1a040c
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OperatorGroupConstantsSpec.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.metadata
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class OperatorGroupConstantsSpec extends AnyFlatSpec with Matchers {
+
+  import OperatorGroupConstants._
+
+  "OperatorGroupConstants" should "pin the canonical group-name string value 
of every constant" in {
+    INPUT_GROUP shouldBe "Data Input"
+    DATABASE_GROUP shouldBe "Database Connector"
+    SEARCH_GROUP shouldBe "Search"
+    CLEANING_GROUP shouldBe "Data Cleaning"
+    JOIN_GROUP shouldBe "Join"
+    SET_GROUP shouldBe "Set"
+    AGGREGATE_GROUP shouldBe "Aggregate"
+    SORT_GROUP shouldBe "Sort"
+    UTILITY_GROUP shouldBe "Utilities"
+    API_GROUP shouldBe "External API"
+    VISUALIZATION_GROUP shouldBe "Visualization"
+    VISUALIZATION_BASIC_GROUP shouldBe "Basic"
+    VISUALIZATION_STATISTICAL_GROUP shouldBe "Statistical"
+    VISUALIZATION_SCIENTIFIC_GROUP shouldBe "Scientific"
+    VISUALIZATION_FINANCIAL_GROUP shouldBe "Financial"
+    VISUALIZATION_MEDIA_GROUP shouldBe "Media"
+    VISUALIZATION_ADVANCED_GROUP shouldBe "Advanced"
+    MACHINE_LEARNING_GROUP shouldBe "Machine Learning"
+    ADVANCED_SKLEARN_GROUP shouldBe "Advanced Sklearn"
+    HUGGINGFACE_GROUP shouldBe "Hugging Face"
+    SKLEARN_GROUP shouldBe "Sklearn"
+    SKLEARN_TRAINING_GROUP shouldBe "Sklearn Training"
+    UDF_GROUP shouldBe "User-defined Functions"
+    PYTHON_GROUP shouldBe "Python"
+    JAVA_GROUP shouldBe "Java"
+    R_GROUP shouldBe "R"
+    MACHINE_LEARNING_GENERAL_GROUP shouldBe "Machine Learning General"
+    CONTROL_GROUP shouldBe "Control Block"
+  }
+
+  "OperatorGroupOrderList" should "start at Data Input, contain Visualization, 
and place Control Block last" in {
+    val names = OperatorGroupOrderList.map(_.groupName)
+    names.head shouldBe INPUT_GROUP
+    names.last shouldBe CONTROL_GROUP
+    names should contain(VISUALIZATION_GROUP)
+  }
+
+  it should "nest the relational subgroups (Join/Set/Aggregate/Sort) under 
Data Cleaning" in {
+    val cleaning = OperatorGroupOrderList
+      .find(_.groupName == CLEANING_GROUP)
+      .getOrElse(fail("Data Cleaning group missing from 
OperatorGroupOrderList"))
+    cleaning.children.map(_.groupName) shouldBe List(
+      JOIN_GROUP,
+      SET_GROUP,
+      AGGREGATE_GROUP,
+      SORT_GROUP
+    )
+  }
+
+  it should "nest the visualization subgroups under Visualization (in panel 
order)" in {
+    val viz = OperatorGroupOrderList
+      .find(_.groupName == VISUALIZATION_GROUP)
+      .getOrElse(fail("Visualization group missing from 
OperatorGroupOrderList"))
+    viz.children.map(_.groupName) shouldBe List(
+      VISUALIZATION_BASIC_GROUP,
+      VISUALIZATION_STATISTICAL_GROUP,
+      VISUALIZATION_SCIENTIFIC_GROUP,
+      VISUALIZATION_FINANCIAL_GROUP,
+      VISUALIZATION_MEDIA_GROUP,
+      VISUALIZATION_ADVANCED_GROUP
+    )
+  }
+}

Reply via email to