This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5899-c0700ff24b16fb8779ca57ff161298b42a6556f0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit e7eba322ca3cd7164187eae782620b4ebcd7f7a3 Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Jun 23 10:43:30 2026 -0700 test(workflow-operator): add unit test coverage for file scan source descriptors (CSVOld, JSONL, Arrow) (#5899) ### What changes were proposed in this PR? Pin behavior of three previously-untested file scan source descriptors in `common/workflow-operator`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `CSVOldScanSourceOpDescSpec` | `CSVOldScanSourceOpDesc` | 6 | | `JSONLScanSourceOpDescSpec` | `JSONLScanSourceOpDesc` | 5 | | `ArrowSourceOpDescSpec` | `ArrowSourceOpDesc` | 5 | **Behavior pinned** | Surface | Contract | | --- | --- | | `operatorInfo` | exact name + description; Data Input group; 0 inputs / 1 output | | field defaults | per-type defaults (CSVOld `customDelimiter=Some(",")`, `hasHeader=true`; JSONL `flatten=false`; encoding `UTF_8`; `limit`/`offset` `None`; `fileTypeName`) | | `sourceSchema()` | `null` before a file is resolved (IO-free short-circuit) | | `getPhysicalOp` | wires `OpExecWithClassName` for the matching `*OpExec`, no input port / one output port, IO-free; CSVOld falls back to a comma on an empty delimiter; JSONL is parallelizable | | Round-trip | config fields preserved through the polymorphic `LogicalOp` base | ### Any related issues, documentation, discussions? Part of the ongoing `workflow-operator` unit-test coverage effort (follow-up to #5843, #5844). ### How was this PR tested? - `sbt "WorkflowOperator/testOnly *CSVOldScanSourceOpDescSpec *JSONLScanSourceOpDescSpec *ArrowSourceOpDescSpec"` — 16 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/scalafixAll --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../source/scan/arrow/ArrowSourceOpDescSpec.scala | 84 +++++++++++++++++++ .../scan/csvOld/CSVOldScanSourceOpDescSpec.scala | 97 ++++++++++++++++++++++ .../scan/json/JSONLScanSourceOpDescSpec.scala | 88 ++++++++++++++++++++ 3 files changed, 269 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpDescSpec.scala new file mode 100644 index 0000000000..3cb4273cad --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpDescSpec.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan.arrow + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.operator.source.scan.FileDecodingMethod +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ArrowSourceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "ArrowSourceOpDesc.operatorInfo" should + "advertise the Arrow file-scan name in the Data Input group with no input and one output" in { + val info = (new ArrowSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "Arrow File Scan" + info.operatorDescription shouldBe "Scan data from an Arrow file" + info.operatorGroupName shouldBe OperatorGroupConstants.INPUT_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "ArrowSourceOpDesc" should "default the encoding and scan window" in { + val d = new ArrowSourceOpDesc + d.fileName shouldBe None + d.fileEncoding shouldBe FileDecodingMethod.UTF_8 + d.limit shouldBe None + d.offset shouldBe None + d.fileTypeName shouldBe Some("Arrow") + } + + "ArrowSourceOpDesc.sourceSchema" should "be null before a file is resolved" in { + (new ArrowSourceOpDesc).sourceSchema() shouldBe null + } + + "ArrowSourceOpDesc.getPhysicalOp" should + "wire the Arrow exec as a source op with no input port and one output port" in { + val d = new ArrowSourceOpDesc + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpExec" + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe empty + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + "ArrowSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new ArrowSourceOpDesc + d.fileName = Some("file:///tmp/data.arrow") + d.limit = Some(7) + d.offset = Some(3) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[ArrowSourceOpDesc] + val r = restored.asInstanceOf[ArrowSourceOpDesc] + r.fileName shouldBe Some("file:///tmp/data.arrow") + r.limit shouldBe Some(7) + r.offset shouldBe Some(3) + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDescSpec.scala new file mode 100644 index 0000000000..e0d7efabf0 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDescSpec.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan.csvOld + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.operator.source.scan.FileDecodingMethod +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class CSVOldScanSourceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "CSVOldScanSourceOpDesc.operatorInfo" should + "advertise the CSVOld file-scan name in the Data Input group with no input and one output" in { + val info = (new CSVOldScanSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "CSVOld File Scan" + info.operatorDescription shouldBe "Scan data from a CSVOld file" + info.operatorGroupName shouldBe OperatorGroupConstants.INPUT_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "CSVOldScanSourceOpDesc" should "default the delimiter, header flag, and scan window" in { + val d = new CSVOldScanSourceOpDesc + d.customDelimiter shouldBe Some(",") + d.hasHeader shouldBe true + d.fileName shouldBe None + d.fileEncoding shouldBe FileDecodingMethod.UTF_8 + d.limit shouldBe None + d.offset shouldBe None + d.fileTypeName shouldBe Some("CSVOld") + } + + "CSVOldScanSourceOpDesc.sourceSchema" should "be null before a file is resolved" in { + (new CSVOldScanSourceOpDesc).sourceSchema() shouldBe null + } + + "CSVOldScanSourceOpDesc.getPhysicalOp" should + "wire the CSVOld exec as a source op with no input port and one output port" in { + val d = new CSVOldScanSourceOpDesc + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpExec" + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe empty + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "fall back to a comma when the configured delimiter is empty" in { + val d = new CSVOldScanSourceOpDesc + d.customDelimiter = Some("") + d.getPhysicalOp(workflowId, executionId) + d.customDelimiter shouldBe Some(",") + } + + "CSVOldScanSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new CSVOldScanSourceOpDesc + d.customDelimiter = Some(";") + d.hasHeader = false + d.fileEncoding = FileDecodingMethod.UTF_16 + d.limit = Some(10) + d.offset = Some(5) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[CSVOldScanSourceOpDesc] + val r = restored.asInstanceOf[CSVOldScanSourceOpDesc] + r.customDelimiter shouldBe Some(";") + r.hasHeader shouldBe false + r.fileEncoding shouldBe FileDecodingMethod.UTF_16 + r.limit shouldBe Some(10) + r.offset shouldBe Some(5) + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpDescSpec.scala new file mode 100644 index 0000000000..7f34281b19 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpDescSpec.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan.json + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.operator.source.scan.FileDecodingMethod +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class JSONLScanSourceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "JSONLScanSourceOpDesc.operatorInfo" should + "advertise the JSONL file-scan name in the Data Input group with no input and one output" in { + val info = (new JSONLScanSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "JSONL File Scan" + info.operatorDescription shouldBe "Scan data from a JSONL file" + info.operatorGroupName shouldBe OperatorGroupConstants.INPUT_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "JSONLScanSourceOpDesc" should "default the flatten flag, encoding, and scan window" in { + val d = new JSONLScanSourceOpDesc + d.flatten shouldBe false + d.fileName shouldBe None + d.fileEncoding shouldBe FileDecodingMethod.UTF_8 + d.limit shouldBe None + d.offset shouldBe None + d.fileTypeName shouldBe Some("JSONL") + } + + "JSONLScanSourceOpDesc.sourceSchema" should "be null before a file is resolved" in { + (new JSONLScanSourceOpDesc).sourceSchema() shouldBe null + } + + "JSONLScanSourceOpDesc.getPhysicalOp" should + "wire the JSONL exec as a source op with no input port and one output port" in { + val d = new JSONLScanSourceOpDesc + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.source.scan.json.JSONLScanSourceOpExec" + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.parallelizable shouldBe true + physical.inputPorts.keySet shouldBe empty + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + "JSONLScanSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new JSONLScanSourceOpDesc + d.flatten = true + d.fileEncoding = FileDecodingMethod.UTF_16 + d.limit = Some(10) + d.offset = Some(5) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[JSONLScanSourceOpDesc] + val r = restored.asInstanceOf[JSONLScanSourceOpDesc] + r.flatten shouldBe true + r.fileEncoding shouldBe FileDecodingMethod.UTF_16 + r.limit shouldBe Some(10) + r.offset shouldBe Some(5) + } +}
