This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat-dcm
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-cm-for-loop-mat-dcm by
this push:
new acbb0b5b40 fix
acbb0b5b40 is described below
commit acbb0b5b402c76bd98be3b580eb9d0a4dc9336c0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 4 11:17:41 2026 -0800
fix
---
.../engine/architecture/worker/DataProcessor.scala | 1 +
.../promisehandlers/EndIterationHandler.scala | 32 +++++++++++-----------
2 files changed, 17 insertions(+), 16 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index dedfc57802..adbc97bfe4 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -173,6 +173,7 @@ class DataProcessor(
EndIterationRequest(worker)
)
outputManager.ECMWriterThreads(portId).putOne(new
Tuple(ResultSchema.ecmSchema, Array(worker.name)))
+ outputManager.ECMWriterThreads(portId).close()
outputManager.closeOutputStorageWriterIfNeeded(portId)
asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId),
asyncRPCClient.mkContext(CONTROLLER))
executor.reset()
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
index fe76fb93bb..80869dc17f 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
@@ -38,24 +38,24 @@ trait EndIterationHandler {
case _: LoopEndOpExec =>
workerInterface.nextIteration(EmptyRequest(),
mkContext(request.worker))
case _ =>
- val channelId = dp.inputManager.currentChannelId
- val portId = dp.inputGateway.getChannel(channelId).getPortId
- dp.inputManager.getPort(portId).completed = true
- dp.inputManager.initBatch(channelId, Array.empty)
- dp.processOnFinish()
+ }
+ val channelId = dp.inputManager.currentChannelId
+ val portId = dp.inputGateway.getChannel(channelId).getPortId
+ dp.inputManager.getPort(portId).completed = true
+ dp.inputManager.initBatch(channelId, Array.empty)
+ dp.processOnFinish()
- dp.outputManager.outputIterator.appendSpecialTupleToEnd(
- FinalizePort(portId, input = true)
- )
+ dp.outputManager.outputIterator.appendSpecialTupleToEnd(
+ FinalizePort(portId, input = true)
+ )
- if (dp.inputManager.getAllPorts.forall(portId =>
dp.inputManager.isPortCompleted(portId))) {
- // Need this check for handling input port dependency relationships.
- // See documentation of isMissingOutputPort
- if (!dp.outputManager.isMissingOutputPort) {
- // assuming all the output ports finalize after all input ports
are finalized.
- dp.outputManager.finalizeIteration(request.worker)
- }
- }
+ if (dp.inputManager.getAllPorts.forall(portId =>
dp.inputManager.isPortCompleted(portId))) {
+ // Need this check for handling input port dependency relationships.
+ // See documentation of isMissingOutputPort
+ if (!dp.outputManager.isMissingOutputPort) {
+ // assuming all the output ports finalize after all input ports are
finalized.
+ dp.outputManager.finalizeIteration(request.worker)
+ }
}
EmptyReturn()
}