This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-loop-feb in repository https://gitbox.apache.org/repos/asf/texera.git
commit 592ff644f6bf6eb79493209005c2fc415c93ebe1 Author: Xinyuan Lin <[email protected]> AuthorDate: Mon Feb 9 01:48:57 2026 -0800 init --- .../controller/WorkflowScheduler.scala | 7 ++- .../controller/execution/RegionExecution.scala | 2 - .../controller/execution/WorkflowExecution.scala | 6 +- .../engine/architecture/scheduling/Schedule.scala | 9 +++ .../worker/promisehandlers/AssignPortHandler.scala | 3 +- .../InitializeExecutorHandler.scala | 1 + .../user/workflow/WorkflowExecutionsResource.scala | 2 + .../apache/texera/amber/operator/LogicalOp.scala | 31 +++-------- .../texera/amber/operator/loop/LoopEndOpDesc.scala | 53 ++++++++++++++++++ .../texera/amber/operator/loop/LoopEndOpExec.scala | 8 +++ .../amber/operator/loop/LoopStartOpDesc.scala | 62 +++++++++++++++++++++ .../amber/operator/loop/LoopStartOpExec.scala | 28 +++++++--- .../workflow-editor/workflow-editor.component.ts | 1 - frontend/src/assets/operator_images/LoopEnd.png | Bin 0 -> 5865 bytes frontend/src/assets/operator_images/LoopStart.png | Bin 0 -> 2138 bytes 15 files changed, 170 insertions(+), 43 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bf..e2239f99a9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -52,6 +52,9 @@ class WorkflowScheduler( this.physicalPlan = updatedPhysicalPlan } - def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() - + def getNextRegions: Set[Region] = { + val region : Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext() + println("current Region: " + region) + region + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala index d5939c2e3b..e905c2b044 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala @@ -59,8 +59,6 @@ case class RegionExecution(region: Region) { physicalOpId: PhysicalOpIdentity, inheritOperatorExecution: Option[OperatorExecution] = None ): OperatorExecution = { - assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.") - operatorExecutions.getOrElseUpdate( physicalOpId, inheritOperatorExecution diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index dea9b692a4..31409b180a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,11 +44,7 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { - // ensure the region execution hasn't been initialized already. - assert( - !regionExecutions.contains(region.id), - s"RegionExecution of ${region.id} already initialized." - ) + regionExecutions.remove(region.id) regionExecutions.getOrElseUpdate(region.id, RegionExecution(region)) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 6f34c9ed1e..d0ba526809 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) + private var loopStartLevel = currentLevel def getRegions: List[Region] = levelSets.values.flatten.toList @@ -28,6 +29,14 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat override def next(): Set[Region] = { val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))) loopStartLevel = currentLevel + currentLevel += 1 + regions + } + + def loopNext(): Set[Region] = { + val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) currentLevel = loopStartLevel currentLevel += 1 regions } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index fe959733ab..1cc725dff8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -41,6 +41,7 @@ trait AssignPortHandler { this: DataProcessorRPCHandlerInitializer => override def assignPort(msg: AssignPortRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = { + println("ergergerge") val schema = Schema.fromRawSchema(msg.schema) if (msg.input) { val inputPortURIStrs = msg.storageUris.toList @@ -55,7 +56,7 @@ trait AssignPortHandler { // Same as AddInputChannelHandler dp.inputGateway.getChannel(channelId).setPortId(msg.portId) dp.inputManager.getPort(msg.portId).channels.add(channelId) - dp.stateManager.assertState(READY, RUNNING, PAUSED) + //dp.stateManager.assertState(READY, RUNNING, PAUSED) } } else { val storageURIOption: Option[URI] = msg.storageUris.head match { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index bf45d8eff9..cc1a32594b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -39,6 +39,7 @@ trait InitializeExecutorHandler { req: InitializeExecutorRequest, ctx: AsyncRPCContext ): Future[EmptyReturn] = { + println(s"Initializing executor with request: $req") dp.serializationManager.setOpInitialization(req) val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) val workerCount = req.totalWorkerCount diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 72fb1c364e..92582afdd2 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -247,6 +247,8 @@ object WorkflowExecutionsResource { OPERATOR_PORT_EXECUTIONS.RESULT_URI ) .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString) + .onConflict() + .doNothing() .execute() } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index eb319a82d1..ee57514212 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -24,15 +24,8 @@ import com.fasterxml.jackson.annotation._ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.tuple.Schema -import org.apache.texera.amber.core.virtualidentity.{ - ExecutionIdentity, - OperatorIdentity, - WorkflowIdentity -} -import org.apache.texera.amber.core.workflow.WorkflowContext.{ - DEFAULT_EXECUTION_ID, - DEFAULT_WORKFLOW_ID -} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} import org.apache.texera.amber.core.workflow.{PhysicalOp, PhysicalPlan, PortIdentity} import org.apache.texera.amber.operator.aggregate.AggregateOpDesc import org.apache.texera.amber.operator.cartesianProduct.CartesianProductOpDesc @@ -42,22 +35,14 @@ import org.apache.texera.amber.operator.distinct.DistinctOpDesc import org.apache.texera.amber.operator.dummy.DummyOpDesc import org.apache.texera.amber.operator.filter.SpecializedFilterOpDesc import org.apache.texera.amber.operator.hashJoin.HashJoinOpDesc -import org.apache.texera.amber.operator.huggingFace.{ - HuggingFaceIrisLogisticRegressionOpDesc, - HuggingFaceSentimentAnalysisOpDesc, - HuggingFaceSpamSMSDetectionOpDesc, - HuggingFaceTextSummarizationOpDesc -} +import org.apache.texera.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} import org.apache.texera.amber.operator.ifStatement.IfOpDesc import org.apache.texera.amber.operator.intersect.IntersectOpDesc import org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDesc import org.apache.texera.amber.operator.keywordSearch.KeywordSearchOpDesc import org.apache.texera.amber.operator.limit.LimitOpDesc import org.apache.texera.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ - SklearnAdvancedKNNClassifierTrainerOpDesc, - SklearnAdvancedKNNRegressorTrainerOpDesc -} +import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import org.apache.texera.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -71,10 +56,7 @@ import org.apache.texera.amber.operator.sleep.SleepOpDesc import org.apache.texera.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc} import org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpDesc import org.apache.texera.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import org.apache.texera.amber.operator.source.apis.twitter.v2.{ - TwitterFullArchiveSearchSourceOpDesc, - TwitterSearchSourceOpDesc -} +import org.apache.texera.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc @@ -137,6 +119,7 @@ import org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc import java.util.UUID @@ -202,6 +185,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 0000000000..f56068e903 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName("org.apache.texera.amber.operator.loop.LoopEndOpExec") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 0000000000..60f18cd5fc --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,8 @@ +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 0000000000..4f3e86eb62 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class LoopStartOpDesc extends LogicalOp { + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.loop.LoopStartOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala similarity index 55% copy from amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala copy to common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala index 6f34c9ed1e..1ff10c650c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala @@ -17,18 +17,28 @@ * under the License. */ -package org.apache.texera.amber.engine.architecture.scheduling +package org.apache.texera.amber.operator.loop -case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { - private var currentLevel = levelSets.keys.minOption.getOrElse(0) +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.util.JSONUtils.objectMapper - def getRegions: List[Region] = levelSets.values.flatten.toList +import scala.collection.mutable - override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel) +class LoopStartOpExec(descString: String) extends OperatorExecutor { + private val data = new mutable.ArrayBuffer[Tuple] + private var currentIteration = 0 - override def next(): Set[Region] = { - val regions = levelSets(currentLevel) - currentLevel += 1 - regions + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + data.append(tuple) + Iterator.empty } + + override def onFinish(port: Int): Iterator[TupleLike] = { + currentIteration += 1 + data.iterator + } + } + + diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index b23f92caf3..185b723b77 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -341,7 +341,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy body: { fill: "rgba(158,158,158,0.2)", pointerEvents: "none", - visibility: "hidden", }, }, }, diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 0000000000..ee0f9ab6fa Binary files /dev/null and b/frontend/src/assets/operator_images/LoopEnd.png differ diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 0000000000..7e5be023cd Binary files /dev/null and b/frontend/src/assets/operator_images/LoopStart.png differ
