This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 02ddb192cd test(workflow-operator): add unit test coverage for 
FlatMapOpExec (#4776)
02ddb192cd is described below

commit 02ddb192cd03e9cd4e54ae04850a4de5f061cb23
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 08:14:01 2026 -0700

    test(workflow-operator): add unit test coverage for FlatMapOpExec (#4776)
    
    ### What changes were proposed in this PR?
    
    Add `FlatMapOpExecSpec` covering the function-delegating contract of
    `FlatMapOpExec`:
    
    - `processTuple` delegates to the configured `flatMapFunc`
    - Empty iterators from `flatMapFunc` produce no output
    - Output order matches the iterator returned by `flatMapFunc`
    - `setFlatMapFunc` overwrites a previously installed function
    - `processTuple` invoked before `setFlatMapFunc` raises
    `NullPointerException` (a misuse smoke test)
    
    ### Any related issues, documentation, discussions?
    
    Closes #4775
    
    ### How was this PR tested?
    
    `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.flatmap.FlatMapOpExecSpec"` — 5/5 tests
    pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../amber/operator/flatmap/FlatMapOpExecSpec.scala | 118 +++++++++++++++++++++
 1 file changed, 118 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpExecSpec.scala
new file mode 100644
index 0000000000..63d4aa82e3
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/flatmap/FlatMapOpExecSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.flatmap
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple, TupleLike}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FlatMapOpExecSpec extends AnyFlatSpec {
+
+  private val schema: Schema =
+    Schema().add(new Attribute("v", AttributeType.INTEGER))
+
+  private def tuple(v: Int): Tuple =
+    Tuple.builder(schema).add(new Attribute("v", AttributeType.INTEGER), 
Integer.valueOf(v)).build()
+
+  "FlatMapOpExec.processTuple" should "delegate to the configured flatMapFunc" 
in {
+    val exec = new FlatMapOpExec()
+    exec.setFlatMapFunc(t => Iterator(t, t))
+
+    val out = exec.processTuple(tuple(1), 0).toList
+    assert(out.size == 2)
+    assert(out.forall(_.asInstanceOf[Tuple] == tuple(1)))
+  }
+
+  it should "apply a duplicating flatMap (1 → 2) across a stream of tuples" in 
{
+    val exec = new FlatMapOpExec()
+    exec.setFlatMapFunc(t => Iterator(t, t))
+    val out = (1 to 4).flatMap(v => exec.processTuple(tuple(v), 0).toList)
+    assert(out.size == 8)
+    val expected = (1 to 4).flatMap(v => List(tuple(v), tuple(v)))
+    assert(out.map(_.asInstanceOf[Tuple]) == expected)
+  }
+
+  it should "apply an expanding flatMap that fans out by the input value" in {
+    val exec = new FlatMapOpExec()
+    exec.setFlatMapFunc { (t: Tuple) =>
+      val n = t.getField[Int]("v")
+      (1 to n).map(_ => t).iterator
+    }
+    val out = exec.processTuple(tuple(3), 0).toList
+    assert(out.size == 3)
+    assert(out.forall(_.asInstanceOf[Tuple] == tuple(3)))
+  }
+
+  it should "apply a filtering flatMap that drops some inputs entirely" in {
+    val exec = new FlatMapOpExec()
+    // Keep only odd values
+    exec.setFlatMapFunc { (t: Tuple) =>
+      if (t.getField[Int]("v") % 2 == 1) Iterator.single(t) else Iterator.empty
+    }
+    val out = (1 to 5).flatMap(v => exec.processTuple(tuple(v), 0).toList)
+    assert(out.map(_.asInstanceOf[Tuple]) == List(tuple(1), tuple(3), 
tuple(5)))
+  }
+
+  it should "apply a stateful flatMap (closes over an external counter)" in {
+    val exec = new FlatMapOpExec()
+    var counter = 0
+    exec.setFlatMapFunc { (t: Tuple) =>
+      counter += 1
+      val emit = (1 to counter).map(_ => t)
+      emit.iterator
+    }
+    val out = (0 until 3).flatMap(_ => exec.processTuple(tuple(7), 0).toList)
+    // counter goes 1, 2, 3 → outputs 1+2+3 = 6 tuples
+    assert(out.size == 6)
+    assert(counter == 3)
+  }
+
+  it should "emit nothing when the flatMapFunc returns an empty iterator" in {
+    val exec = new FlatMapOpExec()
+    exec.setFlatMapFunc(_ => Iterator.empty)
+
+    assert(exec.processTuple(tuple(1), 0).isEmpty)
+  }
+
+  it should "preserve the order of tuples emitted by the flatMapFunc" in {
+    val exec = new FlatMapOpExec()
+    exec.setFlatMapFunc(t => Iterator(tuple(99), t, tuple(0)))
+
+    val out = exec.processTuple(tuple(7), 0).toList.map(_.asInstanceOf[Tuple])
+    assert(out == List(tuple(99), tuple(7), tuple(0)))
+  }
+
+  "FlatMapOpExec.setFlatMapFunc" should "overwrite a previously installed 
function" in {
+    val exec = new FlatMapOpExec()
+    exec.setFlatMapFunc(_ => Iterator.empty)
+    exec.setFlatMapFunc((t: Tuple) => Iterator[TupleLike](t))
+
+    val out = exec.processTuple(tuple(5), 0).toList
+    assert(out == List(tuple(5)))
+  }
+
+  it should "throw NullPointerException when processTuple is invoked before 
setFlatMapFunc" in {
+    val exec = new FlatMapOpExec()
+    assertThrows[NullPointerException] {
+      // Iterator construction calls flatMapFunc(tuple) eagerly, so the NPE
+      // surfaces here even though processTuple itself returns an iterator.
+      exec.processTuple(tuple(1), 0)
+    }
+  }
+}

Reply via email to