This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 14f8be4c08 test(amber): add direct unit tests for WorkflowCompiler
(#5019)
14f8be4c08 is described below
commit 14f8be4c0856dd9f0be2aa594bdfe46a92f9d4cd
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 11 00:15:02 2026 -0700
test(amber): add direct unit tests for WorkflowCompiler (#5019)
---
.../texera/workflow/WorkflowCompilerSpec.scala | 212 +++++++++++++++++++++
1 file changed, 212 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/workflow/WorkflowCompilerSpec.scala
b/amber/src/test/scala/org/apache/texera/workflow/WorkflowCompilerSpec.scala
new file mode 100644
index 0000000000..94a8ffce55
--- /dev/null
+++ b/amber/src/test/scala/org/apache/texera/workflow/WorkflowCompilerSpec.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.workflow
+
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.operator.TestOperators
+import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
+import org.apache.texera.web.model.websocket.request.LogicalPlanPojo
+import org.scalatest.flatspec.AnyFlatSpec
+
+/**
+ * Direct unit coverage for [[WorkflowCompiler]]. Today the compiler is only
+ * exercised transitively by e2e/scheduler specs through
+ * [[org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow]]. The cases
+ * below pin its contract — physical-plan shape, storage-port collection, and
+ * strict-mode error behavior — so future refactors (notably the planned
+ * merge with workflow-compiling-service's compiler) have a direct anchor.
+ *
+ * Not yet covered: the Python codegen `#EXCEPTION DURING CODE GENERATION:`
+ * regex branch. Triggering it requires a `PythonOperatorDescriptor` subclass
+ * whose `generatePythonCode()` throws; left for a follow-up so this initial
+ * spec stays focused on plumbing the compiler boundary itself.
+ */
+class WorkflowCompilerSpec extends AnyFlatSpec {
+
+ private def pojo(
+ operators: List[org.apache.texera.amber.operator.LogicalOp],
+ links: List[LogicalLink],
+ opsToViewResult: List[String] = List.empty
+ ): LogicalPlanPojo =
+ LogicalPlanPojo(operators, links, opsToViewResult, List.empty)
+
+ // -------------------- physical-plan shape --------------------
+
+ "WorkflowCompiler" should "produce a physical plan that contains at least
one physical op per logical op" in {
+ val csv = TestOperators.smallCsvScanOpDesc()
+ val keyword = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val ctx = new WorkflowContext()
+
+ val workflow = new WorkflowCompiler(ctx).compile(
+ pojo(
+ List(csv, keyword),
+ List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(),
+ keyword.operatorIdentifier,
+ PortIdentity()
+ )
+ )
+ )
+ )
+
+ assert(workflow.logicalPlan.operators.size == 2)
+
assert(workflow.physicalPlan.getPhysicalOpsOfLogicalOp(csv.operatorIdentifier).nonEmpty)
+
assert(workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).nonEmpty)
+ }
+
+ it should "translate a logical link into a physical link between the two
logical ops' physical ops" in {
+ val csv = TestOperators.smallCsvScanOpDesc()
+ val keyword = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val ctx = new WorkflowContext()
+
+ val workflow = new WorkflowCompiler(ctx).compile(
+ pojo(
+ List(csv, keyword),
+ List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(),
+ keyword.operatorIdentifier,
+ PortIdentity()
+ )
+ )
+ )
+ )
+
+ val csvPhysIds =
+
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(csv.operatorIdentifier).map(_.id).toSet
+ val keywordPhysIds =
+
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).map(_.id).toSet
+
+ val bridging = workflow.physicalPlan.links.filter(l =>
+ csvPhysIds.contains(l.fromOpId) && keywordPhysIds.contains(l.toOpId)
+ )
+ assert(bridging.nonEmpty, "expected at least one physical link from csv to
keyword")
+ }
+
+ // -------------------- storage-port collection --------------------
+
+ // The compiler walks `logicalPlan.getTerminalOperatorIds` (logical ops with
+ // out-degree 0) plus `opsToViewResult`, and for every physical op of those
+ // logical ops collects every non-internal output port into
+ // `outputPortsNeedingStorage`, which it then writes back onto the
+ // workflow context. These tests pin that the *mutation* lands on the
+ // context (not just a side value), and that both the terminal-default and
+ // the opsToViewResult-additive paths populate it.
+
+ "WorkflowCompiler" should "mark the terminal op's output port as needing
storage on the context" in {
+ val csv = TestOperators.smallCsvScanOpDesc()
+ val keyword = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val ctx = new WorkflowContext()
+
+ new WorkflowCompiler(ctx).compile(
+ pojo(
+ List(csv, keyword),
+ List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(),
+ keyword.operatorIdentifier,
+ PortIdentity()
+ )
+ )
+ )
+ )
+
+ val storage = ctx.workflowSettings.outputPortsNeedingStorage
+ assert(
+ storage.exists(_.opId.logicalOpId == keyword.operatorIdentifier),
+ s"expected keyword to be marked for storage, got
${storage.map(_.opId.logicalOpId)}"
+ )
+ assert(
+ !storage.exists(_.opId.logicalOpId == csv.operatorIdentifier),
+ "csv is not terminal and was not requested via opsToViewResult; it
should not be in storage"
+ )
+ }
+
+ it should "also mark a non-terminal op for storage when it is named in
opsToViewResult" in {
+ val csv = TestOperators.smallCsvScanOpDesc()
+ val keyword = TestOperators.keywordSearchOpDesc("Region", "Asia")
+ val ctx = new WorkflowContext()
+
+ new WorkflowCompiler(ctx).compile(
+ pojo(
+ List(csv, keyword),
+ List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(),
+ keyword.operatorIdentifier,
+ PortIdentity()
+ )
+ ),
+ opsToViewResult = List(csv.operatorIdentifier.id)
+ )
+ )
+
+ val storage = ctx.workflowSettings.outputPortsNeedingStorage
+ val logicalOpsInStorage = storage.map(_.opId.logicalOpId)
+ assert(
+ logicalOpsInStorage.contains(csv.operatorIdentifier),
+ s"opsToViewResult should add csv to storage, got $logicalOpsInStorage"
+ )
+ assert(
+ logicalOpsInStorage.contains(keyword.operatorIdentifier),
+ s"terminal keyword should remain in storage, got $logicalOpsInStorage"
+ )
+ }
+
+ it should "treat a single source op as terminal and mark its output port for
storage" in {
+ val csv = TestOperators.smallCsvScanOpDesc()
+ val ctx = new WorkflowContext()
+
+ new WorkflowCompiler(ctx).compile(pojo(List(csv), List.empty))
+
+ val storage = ctx.workflowSettings.outputPortsNeedingStorage
+ assert(
+ storage.exists(_.opId.logicalOpId == csv.operatorIdentifier),
+ "single op has out-degree 0, so its output port should land in storage"
+ )
+ assert(
+ storage.forall(!_.portId.internal),
+ "compiler must filter out internal ports; storage should expose only
user-visible outputs"
+ )
+ }
+
+ // -------------------- strict-mode error semantics --------------------
+
+ // Re-anchor the subject after the sub-section above.
+ "WorkflowCompiler in strict mode (no errorList)" should
+ "throw when a scan source has no fileName set" in {
+ // CSVScanSourceOpDesc defaults fileName to None;
`resolveScanSourceOpFileName(None)`
+ // hits `scanOp.fileName.getOrElse(throw new RuntimeException("no input
file name"))`
+ // and surfaces that exception out of `compile` because the compiler passes
+ // `None` for the errorList (i.e. fail-fast on the execution path).
+ val orphanCsv = new CSVScanSourceOpDesc()
+ val ctx = new WorkflowContext()
+
+ val ex = intercept[RuntimeException] {
+ new WorkflowCompiler(ctx).compile(pojo(List(orphanCsv), List.empty))
+ }
+ assert(ex.getMessage == "no input file name")
+ }
+}