This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch xinyuan-state-materialization in repository https://gitbox.apache.org/repos/asf/texera.git
commit 3c0c8164e7657234820a6798a42ebaa30d9f6abf Author: Xinyuan Lin <[email protected]> AuthorDate: Thu Apr 23 00:57:09 2026 -0700 feat: add state materialization support --- .../core/architecture/packaging/output_manager.py | 36 ++++++- amber/src/main/python/core/models/state.py | 4 + .../main/python/core/storage/document_factory.py | 107 ++++++++++++--------- .../input_port_materialization_reader_runnable.py | 29 +++++- .../main/python/core/storage/vfs_uri_factory.py | 1 + .../messaginglayer/OutputManager.scala | 20 ++++ .../scheduling/RegionExecutionCoordinator.scala | 58 +++++++++-- .../engine/architecture/worker/DataProcessor.scala | 1 + .../InputPortMaterializationReaderThread.scala | 26 ++++- .../org/apache/texera/amber/core/state/State.scala | 4 + .../amber/core/storage/DocumentFactory.scala | 2 + .../texera/amber/core/storage/VFSURIFactory.scala | 1 + 12 files changed, 226 insertions(+), 63 deletions(-) diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index afa9127fe6..065b063f7d 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -17,6 +17,7 @@ import threading import typing +import uuid from collections import OrderedDict from itertools import chain from loguru import logger @@ -43,7 +44,12 @@ from core.architecture.sendsemantics.round_robin_partitioner import ( ) from core.models import Tuple, Schema, StateFrame from core.models.payload import DataPayload, DataFrame -from core.models.state import State +from core.models.state import ( + State, + STATE_SCHEMA, + serialize_state, + state_uri_from_result_uri, +) from core.storage.document_factory import DocumentFactory from core.storage.runnables.port_storage_writer import ( PortStorageWriter, @@ -87,6 +93,8 @@ class OutputManager: PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread] ] = dict() + self._storage_uris: typing.Dict[PortIdentity, str] = dict() + def is_missing_output_ports(self): """ This method is only used for ensuring correct region execution. @@ -126,6 +134,7 @@ class OutputManager: Create a separate thread for saving output tuples of a port to storage in batch. """ + self._storage_uris[port_id] = storage_uri document, _ = DocumentFactory.open_document(storage_uri) buffered_item_writer = document.writer(str(get_worker_index(self.worker_id))) writer_queue = Queue() @@ -171,6 +180,31 @@ class OutputManager: PortStorageWriterElement(data_tuple=tuple_) ) + def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: + if port_id is None: + uris = self._storage_uris.values() + elif port_id in self._storage_uris: + uris = [self._storage_uris[port_id]] + else: + return + + for uri in uris: + state_uri = state_uri_from_result_uri(uri) + try: + document = DocumentFactory.open_document(state_uri)[0] + except ValueError: + document = DocumentFactory.create_document(state_uri, STATE_SCHEMA) + writer = document.writer(str(uuid.uuid4())) + writer.put_one(serialize_state(state)) + writer.close() + + def reset_output_storage(self) -> None: + port_id = self.get_port_ids()[0] + storage_uri = self._storage_uris[port_id] + self.close_port_storage_writers() + DocumentFactory.create_document(storage_uri, self._ports[port_id].get_schema()) + self.set_up_port_storage_writer(port_id, storage_uri) + def close_port_storage_writers(self) -> None: """ Flush the buffers of port storage writers and wait for all the diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index a496d5c41c..e5726cc3c2 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -32,6 +32,10 @@ _BYTES_TYPE = "bytes" STATE_SCHEMA = Schema(raw_schema={STATE_CONTENT: "STRING"}) +def state_uri_from_result_uri(result_uri: str) -> str: + return result_uri.replace("/result", "/state") + + def serialize_state(state: State) -> Tuple: return Tuple( { diff --git a/amber/src/main/python/core/storage/document_factory.py b/amber/src/main/python/core/storage/document_factory.py index 9b686ab66b..8a4d6fe3c5 100644 --- a/amber/src/main/python/core/storage/document_factory.py +++ b/amber/src/main/python/core/storage/document_factory.py @@ -61,30 +61,35 @@ class DocumentFactory: if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME: _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - # Convert Amber Schema to Iceberg Schema with LARGE_BINARY - # field name encoding - iceberg_schema = amber_schema_to_iceberg_schema(schema) - - create_table( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - override_if_exists=True, - ) - - return IcebergDocument[Tuple]( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = "state" + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + # Convert Amber Schema to Iceberg Schema with LARGE_BINARY + # field name encoding + iceberg_schema = amber_schema_to_iceberg_schema(schema) + + create_table( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + iceberg_schema, + override_if_exists=True, + ) + + return IcebergDocument[Tuple]( + namespace, + storage_key, + iceberg_schema, + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document" @@ -96,30 +101,36 @@ class DocumentFactory: if parsed_uri.scheme == "vfs": _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - table = load_table_metadata( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - ) - - if table is None: - raise ValueError("No storage is found for the given URI") - - amber_schema = Schema(table.schema().as_arrow()) - - document = IcebergDocument( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - table.schema(), - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - return document, amber_schema - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = "state" + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + + table = load_table_metadata( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + ) + + if table is None: + raise ValueError("No storage is found for the given URI") + + amber_schema = Schema(table.schema().as_arrow()) + + document = IcebergDocument( + namespace, + storage_key, + table.schema(), + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + return document, amber_schema + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document" diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index e49c0316cc..a600f87857 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -17,8 +17,8 @@ import typing from loguru import logger -from pyarrow import Table from typing import Union +from pyarrow import Table from core.architecture.sendsemantics.broad_cast_partitioner import ( BroadcastPartitioner, @@ -34,8 +34,9 @@ from core.architecture.sendsemantics.range_based_shuffle_partitioner import ( from core.architecture.sendsemantics.round_robin_partitioner import ( RoundRobinPartitioner, ) -from core.models import Tuple, InternalQueue, DataFrame, DataPayload +from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame from core.models.internal_queue import DataElement, ECMElement +from core.models.state import deserialize_state, state_uri_from_result_uri from core.storage.document_factory import DocumentFactory from core.util import Stoppable, get_one_of from core.util.runnable.runnable import Runnable @@ -125,6 +126,15 @@ class InputPortMaterializationReaderRunnable(Runnable, Stoppable): if receiver == self.worker_actor_id: yield self.tuples_to_data_frame(tuples) + def emit_state_with_filter(self, state: State) -> typing.Iterator[StateFrame]: + for receiver, payload in self.partitioner.flush_state(state): + if receiver == self.worker_actor_id: + yield ( + StateFrame(payload) + if isinstance(payload, dict) + else self.tuples_to_data_frame(payload) + ) + def run(self) -> None: """ Main execution logic that reads tuples from the materialized storage and @@ -138,8 +148,21 @@ class InputPortMaterializationReaderRunnable(Runnable, Stoppable): self.uri ) self.emit_ecm("StartChannel", EmbeddedControlMessageType.NO_ALIGNMENT) - storage_iterator = self.materialization.get() + try: + state_document, _ = DocumentFactory.open_document( + state_uri_from_result_uri(self.uri) + ) + state_iterator = state_document.get() + for state in state_iterator: + for state_frame in self.emit_state_with_filter( + deserialize_state(state) + ): + self.emit_payload(state_frame) + except ValueError: + pass + + storage_iterator = self.materialization.get() # Iterate and process tuples. for tup in storage_iterator: if self._stopped: diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py b/amber/src/main/python/core/storage/vfs_uri_factory.py index de0c5db56e..0e23e60705 100644 --- a/amber/src/main/python/core/storage/vfs_uri_factory.py +++ b/amber/src/main/python/core/storage/vfs_uri_factory.py @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum): RESULT = "result" RUNTIME_STATISTICS = "runtimeStatistics" CONSOLE_MESSAGES = "consoleMessages" + STATE = "state" class VFSURIFactory: diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056..53755b780c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -124,6 +124,8 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + private val storageUris: mutable.HashMap[Int, URI] = mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -232,6 +234,23 @@ class OutputManager( }) } + def saveStateToStorageIfNeeded(state: State): Unit = { + try { + storageUris.foreach { + case (_, uri) => + val writer = DocumentFactory + .openDocument(State.stateUriFromResultUri(uri)) + ._1 + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + writer.putOne(State.serialize(state)) + writer.close() + } + } catch { + case _: Exception => () + } + } + /** * Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that * the output port is properly completed. If the output port does not need storage, no action will be done. @@ -280,6 +299,7 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { + this.storageUris(portId.id) = storageUri val bufferedItemWriter = DocumentFactory .openDocument(storageUri) ._1 diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index e490cde3d9..5be5d942e5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -20,7 +20,8 @@ package org.apache.texera.amber.engine.architecture.scheduling import org.apache.pekko.pattern.gracefulStop -import com.twitter.util.{Future, Return, Throw} +import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, Return, Throw, Timer} +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity @@ -61,7 +62,7 @@ import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration => ScalaDuration} /** * The executor of a region. @@ -109,10 +110,14 @@ class RegionExecutionCoordinator( private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference( Unexecuted ) + private val terminationFutureRef: AtomicReference[Future[Unit]] = new AtomicReference(null) + private val killRetryTimer: Timer = new JavaTimer(true) + private val killRetryDelay: TwitterDuration = TwitterDuration.fromMilliseconds(200) /** * Sync the status of `RegionExecution` and transition this coordinator's phase to `Completed` only when the - * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the ports of this region are completed. + * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the ports of this region are completed, and + * all workers in this region are terminated. * * Additionally, this method will also terminate all the workers of this region: * @@ -135,12 +140,22 @@ class RegionExecutionCoordinator( return Future.Unit } - // Set this coordinator's status to be completed so that subsequent regions can be started by - // WorkflowExecutionCoordinator. - setPhase(Completed) - - // Terminate all the workers in this region. - terminateWorkers(regionExecution) + val existingTerminationFuture = terminationFutureRef.get + if (existingTerminationFuture != null) { + existingTerminationFuture + } else { + val terminationFuture = terminateWorkersWithRetry(regionExecution).flatMap { _ => + // Set this coordinator's status to be completed so that subsequent regions can be started by + // WorkflowExecutionCoordinator. + setPhase(Completed) + Future.Unit + } + if (terminationFutureRef.compareAndSet(null, terminationFuture)) { + terminationFuture + } else { + terminationFutureRef.get + } + } } private def terminateWorkers(regionExecution: RegionExecution) = { @@ -167,7 +182,7 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) - gracefulStop(actorRef, Duration(5, TimeUnit.SECONDS)).asTwitter() + gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq @@ -191,8 +206,29 @@ class RegionExecutionCoordinator( } } + private def terminateWorkersWithRetry( + regionExecution: RegionExecution, + attempt: Int = 1 + ): Future[Unit] = { + terminateWorkers(regionExecution).rescue { case err => + logger.warn( + s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", + err + ) + Future + .sleep(killRetryDelay)(killRetryTimer) + .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) + } + } + def isCompleted: Boolean = currentPhaseRef.get == Completed + /** + * Returns the region termination future if termination has been initiated. + * This is only set by `tryCompleteRegionExecution()`. + */ + def getTerminationFutureOpt: Option[Future[Unit]] = Option(terminationFutureRef.get) + /** * This will sync and transition the region execution phase from one to another depending on its current phase: * @@ -528,12 +564,14 @@ class RegionExecutionCoordinator( portConfigs.foreach { case (outputPortId, portConfig) => val storageUriToAdd = portConfig.storageURI + val stateUriToAdd = State.stateUriFromResultUri(storageUriToAdd) val (_, eid, _, _) = decodeURI(storageUriToAdd) val schemaOptional = region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) DocumentFactory.createDocument(storageUriToAdd, schema) + DocumentFactory.createDocument(stateUriToAdd, State.schema) WorkflowExecutionsResource.insertOperatorPortResultUri( eid = eid, globalPortId = outputPortId, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 3aa5fa90a4..65c560ee59 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -126,6 +126,7 @@ class DataProcessor( val outputState = executor.processState(state, port) if (outputState.isDefined) { outputManager.emitState(outputState.get) + outputManager.saveStateToStorageIfNeeded(state) } } catch safely { case e => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2..acada743bc 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.worker.managers import io.grpc.MethodDescriptor import org.apache.texera.amber.config.ApplicationConfig +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple @@ -45,7 +46,11 @@ import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ DPInputQueueElement, FIFOMessageElement } -import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.ambermessage.{ + DataFrame, + StateFrame, + WorkflowFIFOMessage +} import org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage import java.net.URI @@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread( } // Flush any remaining tuples in the buffer. if (buffer.nonEmpty) flush() + + try { + val state_document = + DocumentFactory + .openDocument(State.stateUriFromResultUri(uri)) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] + val stateReadIterator = state_document.get() + + while (stateReadIterator.hasNext) { + val state = State.deserialize(stateReadIterator.next()) + inputMessageQueue.put( + FIFOMessageElement(WorkflowFIFOMessage(channelId, getSequenceNumber, StateFrame(state))) + ) + } + } catch { + case _: Exception => + } + emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT) isFinished.set(true) } catch { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index 4957f31a40..f76a314b7a 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} import org.apache.texera.amber.util.JSONUtils.objectMapper +import java.net.URI import java.util.Base64 import scala.jdk.CollectionConverters.IteratorHasAsScala @@ -36,6 +37,9 @@ object State { new Attribute(StateContent, AttributeType.STRING) ) + def stateUriFromResultUri(resultUri: URI): URI = + new URI(resultUri.toString.replace("/result", "/state")) + def serialize(state: State): Tuple = { val payloadJson = objectMapper.writeValueAsString(toJsonValue(state)) Tuple.builder(schema).addSequentially(Array(payloadJson)).build() diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 15949ef471..ae37def667 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -72,6 +72,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => "state" case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -119,6 +120,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => "state" case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala index 3513ac5ecd..990776a69f 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala @@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") + val STATE: Value = Value("state") } object VFSURIFactory {
