This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit ea3e7de3deb5036e5825bebc4e7b4e0a4e3f0c38 Author: Xiaozhen Liu <[email protected]> AuthorDate: Tue Nov 25 10:46:09 2025 -0800 feat(cache): add available output port caches in WorkflowSettings --- .../user/workflow/WorkflowExecutionsResource.scala | 31 ++++++++++++++++++++++ .../web/service/WorkflowExecutionService.scala | 23 ++++++++++++++++ .../amber/core/workflow/WorkflowSettings.scala | 15 ++++++++++- 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 8321b0c203..d98d88334d 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -554,6 +554,37 @@ object WorkflowExecutionsResource { .map(URI.create) } + /** + * Lookup a cache entry by workflow, port, and subdag hash. + */ + def getOperatorPortCache( + wid: WorkflowIdentity, + globalPortId: GlobalPortIdentity, + subdagHash: String + ): Option[(URI, Option[Long], String, Option[ExecutionIdentity])] = { + context + .select( + OPERATOR_PORT_CACHE.RESULT_URI, + OPERATOR_PORT_CACHE.TUPLE_COUNT, + OPERATOR_PORT_CACHE.FINGERPRINT_JSON, + OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID + ) + .from(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(wid.id.toInt)) + .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString)) + .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash)) + .fetchOptional() + .toScala + .map { record => + val uri = URI.create(record.get(OPERATOR_PORT_CACHE.RESULT_URI)) + val tupleCount = Option(record.get(OPERATOR_PORT_CACHE.TUPLE_COUNT)).map(_.longValue()) + val fp = record.get(OPERATOR_PORT_CACHE.FINGERPRINT_JSON) + val sourceEid = + Option(record.get(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID)).map(id => ExecutionIdentity(id.longValue())) + (uri, tupleCount, fp, sourceEid) + } + } + case class WorkflowExecutionEntry( eId: Integer, vId: Integer, diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index bfbff230e1..3d50a9eb89 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -28,6 +28,9 @@ import org.apache.amber.engine.architecture.rpc.controlreturns.WorkflowAggregate import org.apache.amber.engine.common.Utils import org.apache.amber.engine.common.client.AmberClient import org.apache.amber.engine.common.executionruntimestate.ExecutionMetadataStore +import org.apache.amber.core.workflow.cache.FingerprintUtil +import org.apache.amber.core.workflow.{CachedOutput, GlobalPortIdentity} +import org.apache.amber.util.serde.GlobalPortIdentitySerde.SerdeOps import org.apache.texera.web.model.websocket.event.{ TexeraWebSocketEvent, WorkflowErrorEvent, @@ -103,10 +106,30 @@ class WorkflowExecutionService( var executionRuntimeService: ExecutionRuntimeService = _ var executionConsoleService: ExecutionConsoleService = _ + private def computeCachedOutputs( + physicalPlan: org.apache.amber.core.workflow.PhysicalPlan + ): Map[String, CachedOutput] = { + physicalPlan.operators + .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, pid))) + .flatMap { gpid => + val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid) + WorkflowExecutionsResource + .getOperatorPortCache(workflowContext.workflowId, gpid, fingerprint.subdagHash) + .map { + case (uri, tupleCount, fpJson, sourceEid) => + gpid.serializeAsString -> CachedOutput(uri, fpJson, tupleCount, sourceEid) + } + } + .toMap + } + def executeWorkflow(): Unit = { try { workflow = new WorkflowCompiler(workflowContext) .compile(request.logicalPlan) + val cachedOutputs = computeCachedOutputs(workflow.physicalPlan) + workflowContext.workflowSettings = + workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs) } catch { case err: Throwable => errorHandler(err) diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala index 679da0d3ee..d44bd63c3c 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala @@ -19,7 +19,20 @@ package org.apache.amber.core.workflow +import org.apache.amber.core.virtualidentity.ExecutionIdentity + +import java.net.URI + +case class CachedOutput( + resultUri: URI, + fingerprintJson: String, + tupleCount: Option[Long], + sourceExecutionId: Option[ExecutionIdentity] +) + case class WorkflowSettings( dataTransferBatchSize: Int, - outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty + outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty, + // serialized GlobalPortIdentity -> cached output + cachedOutputs: Map[String, CachedOutput] = Map.empty )
