This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-stage-by-stage
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-stage-by-stage by this
push:
new 916f081fe0 fix
916f081fe0 is described below
commit 916f081fe0b91169258513aa3e3168a06e71af1f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Jan 22 20:59:10 2026 -0800
fix
---
.../engine/e2e/BatchSizePropagationSpec.scala | 26 +++++-----------------
.../amber/core/workflow/WorkflowContext.scala | 5 +----
.../amber/core/workflow/WorkflowSettings.scala | 6 +++--
3 files changed, 10 insertions(+), 27 deletions(-)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
index 93e94b4fa9..3e71d60ba3 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala
@@ -24,7 +24,6 @@ import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
import org.apache.texera.amber.clustering.SingleNodeListener
import org.apache.texera.amber.core.workflow.{
- ExecutionMode,
PortIdentity,
WorkflowContext,
WorkflowSettings
@@ -124,10 +123,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv
workflow" in {
val expectedBatchSize = 1
- val customWorkflowSettings = WorkflowSettings(
- dataTransferBatchSize = expectedBatchSize,
- executionMode = ExecutionMode.PIPELINED
- )
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -149,10 +145,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for headerlessCsv->keyword
workflow" in {
val expectedBatchSize = 500
- val customWorkflowSettings = WorkflowSettings(
- dataTransferBatchSize = expectedBatchSize,
- executionMode = ExecutionMode.PIPELINED
- )
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -182,10 +175,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->keyword->count
workflow" in {
val expectedBatchSize = 100
- val customWorkflowSettings = WorkflowSettings(
- dataTransferBatchSize = expectedBatchSize,
- executionMode = ExecutionMode.PIPELINED
- )
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -223,10 +213,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for
csv->keyword->averageAndGroupBy workflow" in {
val expectedBatchSize = 300
- val customWorkflowSettings = WorkflowSettings(
- dataTransferBatchSize = expectedBatchSize,
- executionMode = ExecutionMode.PIPELINED
- )
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
@@ -267,10 +254,7 @@ class BatchSizePropagationSpec
"Engine" should "propagate the correct batch size for csv->(csv->)->join
workflow" in {
val expectedBatchSize = 1
- val customWorkflowSettings = WorkflowSettings(
- dataTransferBatchSize = expectedBatchSize,
- executionMode = ExecutionMode.PIPELINED
- )
+ val customWorkflowSettings = WorkflowSettings(dataTransferBatchSize =
expectedBatchSize)
val context =
new WorkflowContext(workflowSettings = customWorkflowSettings)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
index dc4edf2782..bf2911a4e5 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowContext.scala
@@ -29,10 +29,7 @@ import
org.apache.texera.amber.core.workflow.WorkflowContext.{
object WorkflowContext {
val DEFAULT_EXECUTION_ID: ExecutionIdentity = ExecutionIdentity(1L)
val DEFAULT_WORKFLOW_ID: WorkflowIdentity = WorkflowIdentity(1L)
- val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings(
- dataTransferBatchSize = 400,
- executionMode = ExecutionMode.PIPELINED
- )
+ val DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = WorkflowSettings()
}
class WorkflowContext(
var workflowId: WorkflowIdentity = DEFAULT_WORKFLOW_ID,
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
index f5b4a610fd..74408060f9 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/WorkflowSettings.scala
@@ -19,8 +19,10 @@
package org.apache.texera.amber.core.workflow
+import org.apache.texera.config.GuiConfig
+
case class WorkflowSettings(
- dataTransferBatchSize: Int,
+ dataTransferBatchSize: Int = 400,
outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty,
- executionMode: ExecutionMode
+ executionMode: ExecutionMode =
ExecutionMode.valueOf(GuiConfig.guiWorkflowWorkspaceDefaultExecutionMode)
)