This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit a6f41909de029b29793ed4b34615ab027207a599
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Nov 24 21:22:33 2025 -0800

    feat(cache): add operator port cache table, fingerprint util, and persist 
on port completion
---
 .../controller/ControllerProcessor.scala           |   2 +-
 .../promisehandlers/PortCompletedHandler.scala     |  43 ++++-
 .../user/workflow/WorkflowExecutionsResource.scala |  51 ++++++
 .../texera/web/service/WorkflowService.scala       |  10 +-
 .../scheduling/FingerprintUtilSpec.scala           | 138 +++++++++++++++
 .../core/workflow/cache/FingerprintUtil.scala      | 192 +++++++++++++++++++++
 sql/texera_ddl.sql                                 |  16 ++
 sql/updates/16.sql                                 |  45 +++++
 8 files changed, 482 insertions(+), 15 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
index 2f7342fa3e..a07066390e 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -34,7 +34,7 @@ import 
org.apache.amber.engine.architecture.worker.WorkflowWorker.MainThreadDele
 import org.apache.amber.engine.common.ambermessage.WorkflowFIFOMessage
 
 class ControllerProcessor(
-    workflowContext: WorkflowContext,
+    val workflowContext: WorkflowContext,
     controllerConfig: ControllerConfig,
     actorId: ActorVirtualIdentity,
     outputHandler: Either[MainThreadDelegateMessage, WorkflowFIFOMessage] => 
Unit
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
index 8c0b909af5..c4de572761 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
@@ -21,19 +21,15 @@ package 
org.apache.amber.engine.architecture.controller.promisehandlers
 
 import com.twitter.util.Future
 import org.apache.amber.core.WorkflowRuntimeException
+import org.apache.amber.core.storage.DocumentFactory
 import org.apache.amber.core.workflow.GlobalPortIdentity
-import org.apache.amber.engine.architecture.controller.{
-  ControllerAsyncRPCHandlerInitializer,
-  FatalError
-}
-import org.apache.amber.engine.architecture.rpc.controlcommands.{
-  AsyncRPCContext,
-  PortCompletedRequest,
-  QueryStatisticsRequest
-}
+import org.apache.amber.core.workflow.cache.FingerprintUtil
+import 
org.apache.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer,
 FatalError}
+import 
org.apache.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, 
PortCompletedRequest, QueryStatisticsRequest}
 import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.amber.util.VirtualIdentityUtils
+import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 
 /** Notify the completion of a port:
   * - For input port, it means the worker has finished consuming and 
processing all the data
@@ -74,6 +70,35 @@ trait PortCompletedHandler {
               else operatorExecution.isOutputPortCompleted(msg.portId)
 
             if (isPortCompleted) {
+              // If this is an output port and materialized, persist cache 
metadata.
+              if (!msg.input) {
+                val storageUriOpt =
+                  WorkflowExecutionsResource.getResultUriByPhysicalPortId(
+                    cp.workflowContext.executionId,
+                    globalPortId
+                  )
+                (storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) 
match {
+                  case (Some(uri), Some(plan)) =>
+                    val fingerprint = 
FingerprintUtil.computeSubdagFingerprint(plan, globalPortId)
+                    val tupleCount =
+                      try {
+                        Some(DocumentFactory.openDocument(uri)._1.getCount)
+                      } catch {
+                        case _: Throwable => None
+                      }
+                    WorkflowExecutionsResource.upsertOperatorPortCache(
+                      cp.workflowContext.workflowId,
+                      globalPortId,
+                      fingerprint.subdagHash,
+                      fingerprint.fingerprintJson,
+                      uri,
+                      tupleCount,
+                      Some(cp.workflowContext.executionId)
+                    )
+                  case _ => // no-op if plan or URI is missing
+                }
+              }
+
               cp.workflowExecutionCoordinator
                 .coordinateRegionExecutors(cp.actorService)
                 // Since this message is sent from a worker, any exception 
from the above code will be returned to that worker.
diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index ab31ab0cd2..8321b0c203 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -50,6 +50,7 @@ import javax.ws.rs._
 import javax.ws.rs.core.{MediaType, Response}
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
 
 object WorkflowExecutionsResource {
   final private lazy val context = SqlServer
@@ -469,6 +470,39 @@ object WorkflowExecutionsResource {
     }
   }
 
+  /**
+    * Upsert cache entry for an operator port.
+    */
+  def upsertOperatorPortCache(
+      workflowId: WorkflowIdentity,
+      globalPortId: GlobalPortIdentity,
+      subdagHash: String,
+      fingerprintJson: String,
+      resultUri: java.net.URI,
+      tupleCount: Option[Long],
+      sourceExecutionId: Option[ExecutionIdentity]
+  ): Unit = {
+    val record = context.newRecord(OPERATOR_PORT_CACHE)
+    record.setWorkflowId(workflowId.id.toInt)
+    record.setGlobalPortId(globalPortId.serializeAsString)
+    record.setSubdagHash(subdagHash)
+    record.setFingerprintJson(fingerprintJson)
+    record.setResultUri(resultUri.toString)
+    tupleCount.foreach(c => record.setTupleCount(Long.box(c)))
+    sourceExecutionId.foreach(eid => 
record.setSourceExecutionId(Long.box(eid.id)))
+
+    context
+      .insertInto(OPERATOR_PORT_CACHE)
+      .set(record)
+      .onConflict(OPERATOR_PORT_CACHE.WORKFLOW_ID, 
OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, OPERATOR_PORT_CACHE.SUBDAG_HASH)
+      .doUpdate()
+      .set(OPERATOR_PORT_CACHE.RESULT_URI, record.getResultUri)
+      .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, record.getFingerprintJson)
+      .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, record.getTupleCount)
+      .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, 
record.getSourceExecutionId)
+      .execute()
+  }
+
   /**
     * This method is mainly used for frontend requests. Given a logicalOpId 
and an outputPortId of an execution,
     * this method finds the URI for a globalPortId that both: 1. matches the 
logicalOpId and outputPortId, and
@@ -503,6 +537,23 @@ object WorkflowExecutionsResource {
     urisOfEid.find(isMatchingExternalPortURI)
   }
 
+  /**
+    * Lookup a result URI by executionId and physical port id.
+    */
+  def getResultUriByPhysicalPortId(
+      eid: ExecutionIdentity,
+      globalPortId: GlobalPortIdentity
+  ): Option[URI] = {
+    context
+      .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
+      .from(OPERATOR_PORT_EXECUTIONS)
+      .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+      
.and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+      .fetchOptionalInto(classOf[String])
+      .toScala
+      .map(URI.create)
+  }
+
   case class WorkflowExecutionEntry(
       eId: Integer,
       vId: Integer,
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 01c66fb458..74244870d8 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -192,11 +192,11 @@ class WorkflowService(
     var controllerConf = ControllerConfig.default
 
     // clean up results from previous run
-    val previousExecutionId =
-      WorkflowExecutionService.getLatestExecutionId(workflowId, 
req.computingUnitId)
-    previousExecutionId.foreach(eid => {
-      clearExecutionResources(eid)
-    }) // TODO: change this behavior after enabling cache.
+//    val previousExecutionId =
+//      WorkflowExecutionService.getLatestExecutionId(workflowId, 
req.computingUnitId)
+//    previousExecutionId.foreach(eid => {
+//      clearExecutionResources(eid)
+//    }) // TODO: change this behavior after enabling cache.
 
     workflowContext.executionId = 
ExecutionsMetadataPersistService.insertNewExecution(
       workflowContext.workflowId,
diff --git 
a/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala
 
b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala
new file mode 100644
index 0000000000..b85c06801e
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.engine.architecture.scheduling
+
+import org.apache.amber.core.workflow.{GlobalPortIdentity, PortIdentity, 
WorkflowContext}
+import org.apache.amber.core.workflow.cache.FingerprintUtil
+import org.apache.amber.engine.e2e.TestUtils.buildWorkflow
+import org.apache.amber.operator.TestOperators
+import org.apache.amber.operator.aggregate.{AggregateOpDesc, 
AggregationFunction}
+import org.apache.amber.operator.keywordSearch.KeywordSearchOpDesc
+import org.apache.amber.operator.source.scan.csv.CSVScanSourceOpDesc
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class FingerprintUtilSpec extends AnyFlatSpec with Matchers {
+
+  private def newCsv(): CSVScanSourceOpDesc =
+    TestOperators.headerlessSmallCsvScanOpDesc()
+
+  private def newKeyword(pattern: String = "Asia", logicalId: Option[String] = 
None): KeywordSearchOpDesc = {
+    val op = TestOperators.keywordSearchOpDesc("column-1", pattern)
+    logicalId.foreach(id => op.setOperatorId(id))
+    op
+  }
+
+  private def newGroupBy(): AggregateOpDesc =
+    TestOperators.aggregateAndGroupByDesc("column-1", 
AggregationFunction.COUNT, List[String]())
+
+  private def buildSimpleWorkflow(csv: CSVScanSourceOpDesc, keyword: 
KeywordSearchOpDesc) =
+    buildWorkflow(
+      List(csv, keyword),
+      List(
+        LogicalLink(
+          csv.operatorIdentifier,
+          PortIdentity(0),
+          keyword.operatorIdentifier,
+          PortIdentity(0)
+        )
+      ),
+      new WorkflowContext()
+    )
+
+  "FingerprintUtil" should "produce stable hash for identical workflows" in {
+    val csv = newCsv()
+    val keyword = newKeyword()
+    val workflow = buildSimpleWorkflow(csv, keyword)
+    val target = GlobalPortIdentity(
+      
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).head.id,
+      PortIdentity(0, internal = false)
+    )
+    val f1 = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, 
target)
+    val f2 = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, 
target)
+    f1.subdagHash shouldEqual f2.subdagHash
+    f1.fingerprintJson shouldEqual f2.fingerprintJson
+  }
+
+  it should "change hash when operator configuration changes" in {
+    val csv = newCsv()
+    val keyword = newKeyword()
+    val workflow = buildSimpleWorkflow(csv, keyword)
+    val target = GlobalPortIdentity(
+      
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).head.id,
+      PortIdentity(0, internal = false)
+    )
+    // modify keyword search pattern by building a new operator desc
+    val modifiedKeyword = newKeyword("Europe", logicalId = 
Some(keyword.operatorIdentifier.id))
+    val modifiedWorkflow = buildWorkflow(
+      List(csv, modifiedKeyword),
+      List(
+        LogicalLink(
+          csv.operatorIdentifier,
+          PortIdentity(0),
+          modifiedKeyword.operatorIdentifier,
+          PortIdentity(0)
+        )
+      ),
+      new WorkflowContext()
+    )
+    val f1 = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, 
target)
+    val f2 = 
FingerprintUtil.computeSubdagFingerprint(modifiedWorkflow.physicalPlan, target)
+    f1.subdagHash should not equal f2.subdagHash
+  }
+
+  it should "change hash when wiring changes" in {
+    val csv = newCsv()
+    val keyword = newKeyword()
+    val groupBy = newGroupBy()
+    val workflow = buildWorkflow(
+      List(csv, keyword, groupBy),
+      List(
+        LogicalLink(
+          csv.operatorIdentifier,
+          PortIdentity(0),
+          keyword.operatorIdentifier,
+          PortIdentity(0)
+        ),
+        LogicalLink(
+          keyword.operatorIdentifier,
+          PortIdentity(0),
+          groupBy.operatorIdentifier,
+          PortIdentity(0)
+        )
+      ),
+      new WorkflowContext()
+    )
+
+    val targetKeyword = GlobalPortIdentity(
+      
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).head.id,
+      PortIdentity(0, internal = false)
+    )
+    val targetGroupBy = GlobalPortIdentity(
+      
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(groupBy.operatorIdentifier).head.id,
+      PortIdentity(0, internal = false)
+    )
+
+    val fKeyword = 
FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, targetKeyword)
+    val fGroupBy = 
FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, targetGroupBy)
+    fKeyword.subdagHash should not equal fGroupBy.subdagHash
+  }
+}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/cache/FingerprintUtil.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/cache/FingerprintUtil.scala
new file mode 100644
index 0000000000..0ec50917f5
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/cache/FingerprintUtil.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.core.workflow.cache
+
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.databind.SerializationFeature
+import com.fasterxml.jackson.databind.node.ObjectNode
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.amber.core.executor.OpExecInitInfo
+import org.apache.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, 
PhysicalOp, PhysicalPlan}
+
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+
+case class Fingerprint(fingerprintJson: String, subdagHash: String)
+
+/**
+  * Computes deterministic fingerprints for upstream subDAGs to back cache 
keys.
+  *
+  * The fingerprint payload captures:
+  *   - target output port,
+  *   - all upstream physical operators (sorted),
+  *   - their exec init info (proto string),
+  *   - their output schemas (string form when available),
+  *   - all edges between those operators (sorted).
+  *
+  * The payload is serialized with ordered keys and hashed via SHA-256. 
Identical input
+  * plans produce identical hashes; any change in structure or config flips 
the hash.
+  */
+object FingerprintUtil {
+
+  /**
+    * Compute a deterministic fingerprint of the upstream subDAG for the given 
output port.
+    * The payload is canonical (sorted keys) and hashed with SHA-256 to 
generate subdagHash.
+    */
+  def computeSubdagFingerprint(
+      plan: PhysicalPlan,
+      target: GlobalPortIdentity
+  ): Fingerprint = {
+    val subdag = collectUpstream(plan, target)
+    val payload = buildCanonicalPayload(subdag.nodes, subdag.links, target)
+    val fingerprintJson = objectMapper.writeValueAsString(payload)
+    Fingerprint(fingerprintJson, sha256Hex(fingerprintJson))
+  }
+
+  private case class SubDag(nodes: Set[PhysicalOp], links: Set[PhysicalLink])
+
+  /**
+    * Collect all operators and links reachable upstream of the target port's 
operator.
+    * This is limited to the connected component feeding the target.
+    */
+  private def collectUpstream(plan: PhysicalPlan, target: GlobalPortIdentity): 
SubDag = {
+    val visitedOps = scala.collection.mutable.Set(target.opId)
+    val visitedLinks = scala.collection.mutable.Set[PhysicalLink]()
+    val queue = scala.collection.mutable.Queue(target.opId)
+
+    while (queue.nonEmpty) {
+      val current = queue.dequeue()
+      val upstreamLinks = plan.getUpstreamPhysicalLinks(current)
+      upstreamLinks.foreach(link => {
+        visitedLinks.add(link)
+        if (!visitedOps.contains(link.fromOpId)) {
+          visitedOps.add(link.fromOpId)
+          queue.enqueue(link.fromOpId)
+        }
+      })
+    }
+
+    SubDag(
+      nodes = visitedOps.map(plan.getOperator).toSet,
+      links = visitedLinks.toSet.filter(link =>
+        visitedOps.contains(link.fromOpId) && visitedOps.contains(link.toOpId)
+      )
+    )
+  }
+
+  /**
+    * Build a canonical JSON payload describing the subDAG:
+    *  - target port
+    *  - sorted nodes with exec info and schemas
+    *  - sorted edges
+    */
+  private def buildCanonicalPayload(
+      nodes: Set[PhysicalOp],
+      links: Set[PhysicalLink],
+      target: GlobalPortIdentity
+  ): ObjectNode = {
+    val root = objectMapper.createObjectNode()
+    root.put("targetPort", target.toString)
+
+    val nodeArray = objectMapper.createArrayNode()
+    nodes.toList
+      .sortBy(_.id.toString)
+      .foreach(op => nodeArray.add(buildNode(op)))
+    root.set("nodes", nodeArray)
+
+    val edgeArray = objectMapper.createArrayNode()
+    links.toList
+      .sortBy(link => (link.fromOpId.toString, link.fromPortId.id, 
link.toOpId.toString, link.toPortId.id))
+      .foreach(link => edgeArray.add(buildEdge(link)))
+    root.set("edges", edgeArray)
+
+    root
+  }
+
+  /**
+    * Serialize a physical operator into a deterministic JSON node.
+    * Captures IDs, exec init info, and output schemas.
+    */
+  private def buildNode(op: PhysicalOp): ObjectNode = {
+    val node = objectMapper.createObjectNode()
+    node.put("physicalOpId", op.id.toString)
+    node.put("logicalOpId", op.id.logicalOpId.toString)
+    node.set("opExec", serializeOpExec(op.opExecInitInfo))
+
+    val schemaArray = objectMapper.createArrayNode()
+    op.outputPorts.toList
+      .sortBy(_._1.id)
+      .foreach {
+        case (portId, (_, _, schemaEither)) =>
+          val schemaNode = objectMapper.createObjectNode()
+          schemaNode.put("portId", portId.id)
+          schemaNode.put("internal", portId.internal)
+          schemaEither.toOption match {
+            case Some(schema) =>
+              schemaNode.put("available", true)
+              schemaNode.put("schemaString", schema.toString)
+            case None =>
+              schemaNode.put("available", false)
+          }
+          schemaArray.add(schemaNode)
+      }
+    node.set("outputSchemas", schemaArray)
+
+    node
+  }
+
+  /**
+    * Serialize a physical link into a deterministic JSON node.
+    */
+  private def buildEdge(link: PhysicalLink): ObjectNode = {
+    val edge = objectMapper.createObjectNode()
+    edge.put("fromOpId", link.fromOpId.toString)
+    edge.put("fromPortId", link.fromPortId.id)
+    edge.put("fromInternal", link.fromPortId.internal)
+    edge.put("toOpId", link.toOpId.toString)
+    edge.put("toPortId", link.toPortId.id)
+    edge.put("toInternal", link.toPortId.internal)
+    edge
+  }
+
+  private val objectMapper: ObjectMapper = {
+    val mapper = new ObjectMapper()
+    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+    mapper.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
+    mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
+    mapper
+  }
+
+  private def sha256Hex(value: String): String = {
+    val digest = MessageDigest.getInstance("SHA-256")
+    val bytes = digest.digest(value.getBytes(StandardCharsets.UTF_8))
+    bytes.map("%02x".format(_)).mkString
+  }
+
+  /**
+    * Serialize op init info deterministically using its proto string.
+    */
+  private def serializeOpExec(opExecInitInfo: OpExecInitInfo): ObjectNode = {
+    val n = objectMapper.createObjectNode()
+    // Use canonical proto string for determinism.
+    n.put("protoString", opExecInitInfo.asMessage.toProtoString)
+    n
+  }
+}
diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql
index 7b0f9b9063..a112413a4a 100644
--- a/sql/texera_ddl.sql
+++ b/sql/texera_ddl.sql
@@ -48,6 +48,7 @@ SET search_path TO texera_db, public;
 -- ============================================
 DROP TABLE IF EXISTS operator_executions CASCADE;
 DROP TABLE IF EXISTS operator_port_executions CASCADE;
+DROP TABLE IF EXISTS operator_port_cache CASCADE;
 DROP TABLE IF EXISTS workflow_user_access CASCADE;
 DROP TABLE IF EXISTS workflow_of_user CASCADE;
 DROP TABLE IF EXISTS user_config CASCADE;
@@ -296,6 +297,21 @@ CREATE TABLE operator_port_executions
     FOREIGN KEY (workflow_execution_id) REFERENCES workflow_executions(eid) ON 
DELETE CASCADE
 );
 
+-- operator_port_cache
+CREATE TABLE operator_port_cache
+(
+    workflow_id      INT NOT NULL,
+    global_port_id   VARCHAR(200) NOT NULL,
+    subdag_hash      CHAR(64) NOT NULL,
+    fingerprint_json TEXT NOT NULL,
+    result_uri       TEXT NOT NULL,
+    tuple_count      BIGINT,
+    source_execution_id BIGINT,
+    updated_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
+    PRIMARY KEY (workflow_id, global_port_id, subdag_hash),
+    FOREIGN KEY (workflow_id) REFERENCES workflow(wid) ON DELETE CASCADE
+);
+
 -- workflow_user_likes
 CREATE TABLE IF NOT EXISTS workflow_user_likes
 (
diff --git a/sql/updates/16.sql b/sql/updates/16.sql
new file mode 100644
index 0000000000..6276738d72
--- /dev/null
+++ b/sql/updates/16.sql
@@ -0,0 +1,45 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+
+SET search_path TO texera_db;
+
+-- ============================================
+-- 2. Update the table schema
+-- ============================================
+BEGIN;
+
+-- Cache table keyed by workflow + global port + subdag hash.
+CREATE TABLE IF NOT EXISTS operator_port_cache
+(
+    workflow_id         INT NOT NULL,
+    global_port_id      VARCHAR(200) NOT NULL,
+    subdag_hash         CHAR(64) NOT NULL,
+    fingerprint_json    TEXT NOT NULL,
+    result_uri          TEXT NOT NULL,
+    tuple_count         BIGINT,
+    source_execution_id BIGINT,
+    updated_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
+    PRIMARY KEY (workflow_id, global_port_id, subdag_hash),
+    FOREIGN KEY (workflow_id) REFERENCES workflow(wid) ON DELETE CASCADE
+);
+
+COMMIT;

Reply via email to