This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5899-c0700ff24b16fb8779ca57ff161298b42a6556f0
in repository https://gitbox.apache.org/repos/asf/texera.git

commit e7eba322ca3cd7164187eae782620b4ebcd7f7a3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 23 10:43:30 2026 -0700

    test(workflow-operator): add unit test coverage for file scan source 
descriptors (CSVOld, JSONL, Arrow) (#5899)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three previously-untested file scan source descriptors
    in `common/workflow-operator`. No production-code changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `CSVOldScanSourceOpDescSpec` | `CSVOldScanSourceOpDesc` | 6 |
    | `JSONLScanSourceOpDescSpec` | `JSONLScanSourceOpDesc` | 5 |
    | `ArrowSourceOpDescSpec` | `ArrowSourceOpDesc` | 5 |
    
    **Behavior pinned**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | exact name + description; Data Input group; 0 inputs
    / 1 output |
    | field defaults | per-type defaults (CSVOld
    `customDelimiter=Some(",")`, `hasHeader=true`; JSONL `flatten=false`;
    encoding `UTF_8`; `limit`/`offset` `None`; `fileTypeName`) |
    | `sourceSchema()` | `null` before a file is resolved (IO-free
    short-circuit) |
    | `getPhysicalOp` | wires `OpExecWithClassName` for the matching
    `*OpExec`, no input port / one output port, IO-free; CSVOld falls back
    to a comma on an empty delimiter; JSONL is parallelizable |
    | Round-trip | config fields preserved through the polymorphic
    `LogicalOp` base |
    
    ### Any related issues, documentation, discussions?
    
    Part of the ongoing `workflow-operator` unit-test coverage effort
    (follow-up to #5843, #5844).
    
    ### How was this PR tested?
    
    - `sbt "WorkflowOperator/testOnly *CSVOldScanSourceOpDescSpec
    *JSONLScanSourceOpDescSpec *ArrowSourceOpDescSpec"` — 16 tests, all
    green
    - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
    "WorkflowOperator/scalafixAll --check"` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.8 [1M context])
---
 .../source/scan/arrow/ArrowSourceOpDescSpec.scala  | 84 +++++++++++++++++++
 .../scan/csvOld/CSVOldScanSourceOpDescSpec.scala   | 97 ++++++++++++++++++++++
 .../scan/json/JSONLScanSourceOpDescSpec.scala      | 88 ++++++++++++++++++++
 3 files changed, 269 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpDescSpec.scala
new file mode 100644
index 0000000000..3cb4273cad
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpDescSpec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.arrow
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class ArrowSourceOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "ArrowSourceOpDesc.operatorInfo" should
+    "advertise the Arrow file-scan name in the Data Input group with no input 
and one output" in {
+    val info = (new ArrowSourceOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Arrow File Scan"
+    info.operatorDescription shouldBe "Scan data from an Arrow file"
+    info.operatorGroupName shouldBe OperatorGroupConstants.INPUT_GROUP
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+  }
+
+  "ArrowSourceOpDesc" should "default the encoding and scan window" in {
+    val d = new ArrowSourceOpDesc
+    d.fileName shouldBe None
+    d.fileEncoding shouldBe FileDecodingMethod.UTF_8
+    d.limit shouldBe None
+    d.offset shouldBe None
+    d.fileTypeName shouldBe Some("Arrow")
+  }
+
+  "ArrowSourceOpDesc.sourceSchema" should "be null before a file is resolved" 
in {
+    (new ArrowSourceOpDesc).sourceSchema() shouldBe null
+  }
+
+  "ArrowSourceOpDesc.getPhysicalOp" should
+    "wire the Arrow exec as a source op with no input port and one output 
port" in {
+    val d = new ArrowSourceOpDesc
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpExec"
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe empty
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  "ArrowSourceOpDesc" should "round-trip its config fields through the 
polymorphic base" in {
+    val d = new ArrowSourceOpDesc
+    d.fileName = Some("file:///tmp/data.arrow")
+    d.limit = Some(7)
+    d.offset = Some(3)
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[ArrowSourceOpDesc]
+    val r = restored.asInstanceOf[ArrowSourceOpDesc]
+    r.fileName shouldBe Some("file:///tmp/data.arrow")
+    r.limit shouldBe Some(7)
+    r.offset shouldBe Some(3)
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDescSpec.scala
new file mode 100644
index 0000000000..e0d7efabf0
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDescSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.csvOld
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class CSVOldScanSourceOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "CSVOldScanSourceOpDesc.operatorInfo" should
+    "advertise the CSVOld file-scan name in the Data Input group with no input 
and one output" in {
+    val info = (new CSVOldScanSourceOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "CSVOld File Scan"
+    info.operatorDescription shouldBe "Scan data from a CSVOld file"
+    info.operatorGroupName shouldBe OperatorGroupConstants.INPUT_GROUP
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+  }
+
+  "CSVOldScanSourceOpDesc" should "default the delimiter, header flag, and 
scan window" in {
+    val d = new CSVOldScanSourceOpDesc
+    d.customDelimiter shouldBe Some(",")
+    d.hasHeader shouldBe true
+    d.fileName shouldBe None
+    d.fileEncoding shouldBe FileDecodingMethod.UTF_8
+    d.limit shouldBe None
+    d.offset shouldBe None
+    d.fileTypeName shouldBe Some("CSVOld")
+  }
+
+  "CSVOldScanSourceOpDesc.sourceSchema" should "be null before a file is 
resolved" in {
+    (new CSVOldScanSourceOpDesc).sourceSchema() shouldBe null
+  }
+
+  "CSVOldScanSourceOpDesc.getPhysicalOp" should
+    "wire the CSVOld exec as a source op with no input port and one output 
port" in {
+    val d = new CSVOldScanSourceOpDesc
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpExec"
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.inputPorts.keySet shouldBe empty
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  it should "fall back to a comma when the configured delimiter is empty" in {
+    val d = new CSVOldScanSourceOpDesc
+    d.customDelimiter = Some("")
+    d.getPhysicalOp(workflowId, executionId)
+    d.customDelimiter shouldBe Some(",")
+  }
+
+  "CSVOldScanSourceOpDesc" should "round-trip its config fields through the 
polymorphic base" in {
+    val d = new CSVOldScanSourceOpDesc
+    d.customDelimiter = Some(";")
+    d.hasHeader = false
+    d.fileEncoding = FileDecodingMethod.UTF_16
+    d.limit = Some(10)
+    d.offset = Some(5)
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[CSVOldScanSourceOpDesc]
+    val r = restored.asInstanceOf[CSVOldScanSourceOpDesc]
+    r.customDelimiter shouldBe Some(";")
+    r.hasHeader shouldBe false
+    r.fileEncoding shouldBe FileDecodingMethod.UTF_16
+    r.limit shouldBe Some(10)
+    r.offset shouldBe Some(5)
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpDescSpec.scala
new file mode 100644
index 0000000000..7f34281b19
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpDescSpec.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.json
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class JSONLScanSourceOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "JSONLScanSourceOpDesc.operatorInfo" should
+    "advertise the JSONL file-scan name in the Data Input group with no input 
and one output" in {
+    val info = (new JSONLScanSourceOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "JSONL File Scan"
+    info.operatorDescription shouldBe "Scan data from a JSONL file"
+    info.operatorGroupName shouldBe OperatorGroupConstants.INPUT_GROUP
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+  }
+
+  "JSONLScanSourceOpDesc" should "default the flatten flag, encoding, and scan 
window" in {
+    val d = new JSONLScanSourceOpDesc
+    d.flatten shouldBe false
+    d.fileName shouldBe None
+    d.fileEncoding shouldBe FileDecodingMethod.UTF_8
+    d.limit shouldBe None
+    d.offset shouldBe None
+    d.fileTypeName shouldBe Some("JSONL")
+  }
+
+  "JSONLScanSourceOpDesc.sourceSchema" should "be null before a file is 
resolved" in {
+    (new JSONLScanSourceOpDesc).sourceSchema() shouldBe null
+  }
+
+  "JSONLScanSourceOpDesc.getPhysicalOp" should
+    "wire the JSONL exec as a source op with no input port and one output 
port" in {
+    val d = new JSONLScanSourceOpDesc
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.source.scan.json.JSONLScanSourceOpExec"
+      case other => fail(s"expected OpExecWithClassName, got $other")
+    }
+    physical.parallelizable shouldBe true
+    physical.inputPorts.keySet shouldBe empty
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  "JSONLScanSourceOpDesc" should "round-trip its config fields through the 
polymorphic base" in {
+    val d = new JSONLScanSourceOpDesc
+    d.flatten = true
+    d.fileEncoding = FileDecodingMethod.UTF_16
+    d.limit = Some(10)
+    d.offset = Some(5)
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[JSONLScanSourceOpDesc]
+    val r = restored.asInstanceOf[JSONLScanSourceOpDesc]
+    r.flatten shouldBe true
+    r.fileEncoding shouldBe FileDecodingMethod.UTF_16
+    r.limit shouldBe Some(10)
+    r.offset shouldBe Some(5)
+  }
+}

Reply via email to