This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat in repository https://gitbox.apache.org/repos/asf/texera.git
commit d6edcfd23c842c003a91c4d887811c8fe11f07bc Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Jan 27 23:53:36 2026 -0800 init --- .../messaginglayer/OutputManager.scala | 27 +++++++++++----------- .../engine/architecture/worker/DataProcessor.scala | 4 +++- .../InputPortMaterializationReaderThread.scala | 10 ++++++++ .../managers/OutputPortResultWriterThread.scala | 18 +++++++++++---- common/config/src/main/resources/storage.conf | 3 +++ .../apache/texera/amber/config/StorageConfig.scala | 2 ++ .../amber/core/storage/DocumentFactory.scala | 2 ++ .../texera/amber/core/storage/VFSURIFactory.scala | 1 + .../amber/core/storage/result/ResultSchema.scala | 4 ++++ 9 files changed, 52 insertions(+), 19 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 9a5ff22439..1fa43cda4a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -20,22 +20,16 @@ package org.apache.texera.amber.engine.architecture.messaginglayer import org.apache.texera.amber.core.state.State -import org.apache.texera.amber.core.storage.DocumentFactory +import org.apache.texera.amber.core.storage.{DocumentFactory, VFSResourceType} import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} -import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{ - DPOutputIterator, - getBatchSize, - toPartitioner -} +import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner} import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ -import org.apache.texera.amber.engine.architecture.worker.managers.{ - OutputPortResultWriterThread, - PortStorageWriterTerminateSignal -} +import org.apache.texera.amber.engine.architecture.worker.managers.{OutputPortResultWriterThread, PortStorageWriterTerminateSignal} import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.util.VirtualIdentityUtils @@ -215,7 +209,7 @@ class OutputManager( * @param outputPortId If not specified, the tuple will be written to all output ports that need storage. */ def saveTupleToStorageIfNeeded( - tuple: Tuple, + tuple: Either[Tuple, String], outputPortId: Option[PortIdentity] = None ): Unit = { (outputPortId match { @@ -284,12 +278,19 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { - val bufferedItemWriter = DocumentFactory + val bufferedTupleWriter = DocumentFactory .openDocument(storageUri) ._1 .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) .asInstanceOf[BufferedItemWriter[Tuple]] - val writerThread = new OutputPortResultWriterThread(bufferedItemWriter) + + val ecmUri = storageUri.resolve("ecm") + val bufferedECMWriter = DocumentFactory + .createDocument(ecmUri, ResultSchema.ecmSchema) + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + + val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter, bufferedECMWriter) this.outputPortResultWriterThreads(portId) = writerThread writerThread.start() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index dae3446826..1ef9dd7d99 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -188,13 +188,15 @@ class DataProcessor( PORT_ALIGNMENT, EndIterationRequest(worker) ) + executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) statisticsManager.increaseOutputStatistics(portIdentity, tuple.inMemSize) outputManager.passTupleToDownstream(tuple, outputPortOpt) - outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(Right(actorId.toString), outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(Left(tuple), outputPortOpt) case other => // skip for now } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2..953bc677af 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -84,6 +84,16 @@ class InputPortMaterializationReaderThread( // Notify the input port of start of input channel emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT) try { + val ecm: VirtualDocument[Tuple] = DocumentFactory + .openDocument(uri.resolve("ecm")) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] + val ecmReadIterator = ecm.get() + if (ecmReadIterator.hasNext) { + val tuple = ecmReadIterator.next() + println("Received ECM tuple: " + tuple) + } + val materialization: VirtualDocument[Tuple] = DocumentFactory .openDocument(uri) ._1 diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index 28e5d2af66..f8d0710aa8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -22,6 +22,7 @@ package org.apache.texera.amber.engine.architecture.worker.managers import com.google.common.collect.Queues import org.apache.texera.amber.core.storage.model.BufferedItemWriter import org.apache.texera.amber.core.tuple.Tuple +import org.apache.texera.amber.core.storage.result.ResultSchema import java.util.concurrent.LinkedBlockingQueue @@ -29,21 +30,28 @@ sealed trait TerminateSignal case object PortStorageWriterTerminateSignal extends TerminateSignal class OutputPortResultWriterThread( - bufferedItemWriter: BufferedItemWriter[Tuple] + bufferedTupleWriter: BufferedItemWriter[Tuple], + bufferedECMWriter: BufferedItemWriter[Tuple] ) extends Thread { - val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = - Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]() + val queue: LinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]] = + Queues.newLinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]]() override def run(): Unit = { var internalStop = false while (!internalStop) { val queueContent = queue.take() queueContent match { - case Left(tuple) => bufferedItemWriter.putOne(tuple) + case Left(item) => item match { + case Left(tuple) => bufferedTupleWriter.putOne(tuple) + case Right(ecm) => + val ecmTuple = new Tuple(ResultSchema.ecmSchema, Array(ecm)) + bufferedECMWriter.putOne(ecmTuple) + } case Right(_) => internalStop = true } } - bufferedItemWriter.close() + bufferedTupleWriter.close() + bufferedECMWriter.close() } } diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 85a62b77a3..a158eb8753 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -52,6 +52,9 @@ storage { runtime-statistics-namespace = "workflow-runtime-statistics" runtime-statistics-namespace = ${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE} + ecm-namespace = "ecm" + ecm-namespace = ${?STORAGE_ICEBERG_TABLE_ECM_NAMESPACE} + commit { batch-size = 4096 # decide the buffer size of our IcebergTableWriter batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE} diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index c5bd330286..8a1aba73b7 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -63,6 +63,7 @@ object StorageConfig { conf.getInt("storage.iceberg.table.commit.retry.min-wait-ms") val icebergTableCommitMaxRetryWaitMs: Int = conf.getInt("storage.iceberg.table.commit.retry.max-wait-ms") + val icebergTableECMNamespace: String = conf.getString("storage.iceberg.table.ecm-namespace") // LakeFS specifics // lakefsEndpoint is a var because in test we need to override it to point to the test container @@ -116,6 +117,7 @@ object StorageConfig { val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = "STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES" val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS" val ENV_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS" + val ENV_ICEBERG_TABLE_ECM_NAMESPACE = "STORAGE_ICEBERG_TABLE_ECM_NAMESPACE" // LakeFS val ENV_LAKEFS_ENDPOINT = "STORAGE_LAKEFS_ENDPOINT" diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 4c37c33bb2..5b27170560 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -72,6 +72,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case ECM => StorageConfig.icebergTableECMNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -126,6 +127,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case ECM => StorageConfig.icebergTableECMNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala index 3513ac5ecd..f4f3a9e4bf 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala @@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") + val ECM: Value = Value("ecm") } object VFSURIFactory { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala index ade33283f7..a3978d0c71 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala @@ -39,4 +39,8 @@ object ResultSchema { val consoleMessagesSchema: Schema = new Schema( new Attribute("message", AttributeType.STRING) ) + + val ecmSchema: Schema = new Schema( + new Attribute("workerId", AttributeType.STRING) + ) }
