This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit ea3e7de3deb5036e5825bebc4e7b4e0a4e3f0c38
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Tue Nov 25 10:46:09 2025 -0800

    feat(cache): add available output port caches in WorkflowSettings
---
 .../user/workflow/WorkflowExecutionsResource.scala | 31 ++++++++++++++++++++++
 .../web/service/WorkflowExecutionService.scala     | 23 ++++++++++++++++
 .../amber/core/workflow/WorkflowSettings.scala     | 15 ++++++++++-
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index 8321b0c203..d98d88334d 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -554,6 +554,37 @@ object WorkflowExecutionsResource {
       .map(URI.create)
   }
 
+  /**
+    * Lookup a cache entry by workflow, port, and subdag hash.
+    */
+  def getOperatorPortCache(
+      wid: WorkflowIdentity,
+      globalPortId: GlobalPortIdentity,
+      subdagHash: String
+  ): Option[(URI, Option[Long], String, Option[ExecutionIdentity])] = {
+    context
+      .select(
+        OPERATOR_PORT_CACHE.RESULT_URI,
+        OPERATOR_PORT_CACHE.TUPLE_COUNT,
+        OPERATOR_PORT_CACHE.FINGERPRINT_JSON,
+        OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID
+      )
+      .from(OPERATOR_PORT_CACHE)
+      .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(wid.id.toInt))
+      
.and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+      .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash))
+      .fetchOptional()
+      .toScala
+      .map { record =>
+        val uri = URI.create(record.get(OPERATOR_PORT_CACHE.RESULT_URI))
+        val tupleCount = 
Option(record.get(OPERATOR_PORT_CACHE.TUPLE_COUNT)).map(_.longValue())
+        val fp = record.get(OPERATOR_PORT_CACHE.FINGERPRINT_JSON)
+        val sourceEid =
+          Option(record.get(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID)).map(id 
=> ExecutionIdentity(id.longValue()))
+        (uri, tupleCount, fp, sourceEid)
+      }
+  }
+
   case class WorkflowExecutionEntry(
       eId: Integer,
       vId: Integer,
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index bfbff230e1..3d50a9eb89 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -28,6 +28,9 @@ import 
org.apache.amber.engine.architecture.rpc.controlreturns.WorkflowAggregate
 import org.apache.amber.engine.common.Utils
 import org.apache.amber.engine.common.client.AmberClient
 import 
org.apache.amber.engine.common.executionruntimestate.ExecutionMetadataStore
+import org.apache.amber.core.workflow.cache.FingerprintUtil
+import org.apache.amber.core.workflow.{CachedOutput, GlobalPortIdentity}
+import org.apache.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.web.model.websocket.event.{
   TexeraWebSocketEvent,
   WorkflowErrorEvent,
@@ -103,10 +106,30 @@ class WorkflowExecutionService(
   var executionRuntimeService: ExecutionRuntimeService = _
   var executionConsoleService: ExecutionConsoleService = _
 
+  private def computeCachedOutputs(
+      physicalPlan: org.apache.amber.core.workflow.PhysicalPlan
+  ): Map[String, CachedOutput] = {
+    physicalPlan.operators
+      .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, 
pid)))
+      .flatMap { gpid =>
+        val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid)
+        WorkflowExecutionsResource
+          .getOperatorPortCache(workflowContext.workflowId, gpid, 
fingerprint.subdagHash)
+          .map {
+            case (uri, tupleCount, fpJson, sourceEid) =>
+              gpid.serializeAsString -> CachedOutput(uri, fpJson, tupleCount, 
sourceEid)
+          }
+      }
+      .toMap
+  }
+
   def executeWorkflow(): Unit = {
     try {
       workflow = new WorkflowCompiler(workflowContext)
         .compile(request.logicalPlan)
+      val cachedOutputs = computeCachedOutputs(workflow.physicalPlan)
+      workflowContext.workflowSettings =
+        workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs)
     } catch {
       case err: Throwable =>
         errorHandler(err)
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala
index 679da0d3ee..d44bd63c3c 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/WorkflowSettings.scala
@@ -19,7 +19,20 @@
 
 package org.apache.amber.core.workflow
 
+import org.apache.amber.core.virtualidentity.ExecutionIdentity
+
+import java.net.URI
+
+case class CachedOutput(
+    resultUri: URI,
+    fingerprintJson: String,
+    tupleCount: Option[Long],
+    sourceExecutionId: Option[ExecutionIdentity]
+)
+
 case class WorkflowSettings(
     dataTransferBatchSize: Int,
-    outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty
+    outputPortsNeedingStorage: Set[GlobalPortIdentity] = Set.empty,
+    // serialized GlobalPortIdentity -> cached output
+    cachedOutputs: Map[String, CachedOutput] = Map.empty
 )

Reply via email to