This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit a6f41909de029b29793ed4b34615ab027207a599 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Nov 24 21:22:33 2025 -0800 feat(cache): add operator port cache table, fingerprint util, and persist on port completion --- .../controller/ControllerProcessor.scala | 2 +- .../promisehandlers/PortCompletedHandler.scala | 43 ++++- .../user/workflow/WorkflowExecutionsResource.scala | 51 ++++++ .../texera/web/service/WorkflowService.scala | 10 +- .../scheduling/FingerprintUtilSpec.scala | 138 +++++++++++++++ .../core/workflow/cache/FingerprintUtil.scala | 192 +++++++++++++++++++++ sql/texera_ddl.sql | 16 ++ sql/updates/16.sql | 45 +++++ 8 files changed, 482 insertions(+), 15 deletions(-) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala index 2f7342fa3e..a07066390e 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerProcessor.scala @@ -34,7 +34,7 @@ import org.apache.amber.engine.architecture.worker.WorkflowWorker.MainThreadDele import org.apache.amber.engine.common.ambermessage.WorkflowFIFOMessage class ControllerProcessor( - workflowContext: WorkflowContext, + val workflowContext: WorkflowContext, controllerConfig: ControllerConfig, actorId: ActorVirtualIdentity, outputHandler: Either[MainThreadDelegateMessage, WorkflowFIFOMessage] => Unit diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala index 8c0b909af5..c4de572761 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala @@ -21,19 +21,15 @@ package org.apache.amber.engine.architecture.controller.promisehandlers import com.twitter.util.Future import org.apache.amber.core.WorkflowRuntimeException +import org.apache.amber.core.storage.DocumentFactory import org.apache.amber.core.workflow.GlobalPortIdentity -import org.apache.amber.engine.architecture.controller.{ - ControllerAsyncRPCHandlerInitializer, - FatalError -} -import org.apache.amber.engine.architecture.rpc.controlcommands.{ - AsyncRPCContext, - PortCompletedRequest, - QueryStatisticsRequest -} +import org.apache.amber.core.workflow.cache.FingerprintUtil +import org.apache.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer, FatalError} +import org.apache.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, PortCompletedRequest, QueryStatisticsRequest} import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.amber.util.VirtualIdentityUtils +import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource /** Notify the completion of a port: * - For input port, it means the worker has finished consuming and processing all the data @@ -74,6 +70,35 @@ trait PortCompletedHandler { else operatorExecution.isOutputPortCompleted(msg.portId) if (isPortCompleted) { + // If this is an output port and materialized, persist cache metadata. + if (!msg.input) { + val storageUriOpt = + WorkflowExecutionsResource.getResultUriByPhysicalPortId( + cp.workflowContext.executionId, + globalPortId + ) + (storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) match { + case (Some(uri), Some(plan)) => + val fingerprint = FingerprintUtil.computeSubdagFingerprint(plan, globalPortId) + val tupleCount = + try { + Some(DocumentFactory.openDocument(uri)._1.getCount) + } catch { + case _: Throwable => None + } + WorkflowExecutionsResource.upsertOperatorPortCache( + cp.workflowContext.workflowId, + globalPortId, + fingerprint.subdagHash, + fingerprint.fingerprintJson, + uri, + tupleCount, + Some(cp.workflowContext.executionId) + ) + case _ => // no-op if plan or URI is missing + } + } + cp.workflowExecutionCoordinator .coordinateRegionExecutors(cp.actorService) // Since this message is sent from a worker, any exception from the above code will be returned to that worker. diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index ab31ab0cd2..8321b0c203 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -50,6 +50,7 @@ import javax.ws.rs._ import javax.ws.rs.core.{MediaType, Response} import scala.collection.mutable import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters._ object WorkflowExecutionsResource { final private lazy val context = SqlServer @@ -469,6 +470,39 @@ object WorkflowExecutionsResource { } } + /** + * Upsert cache entry for an operator port. + */ + def upsertOperatorPortCache( + workflowId: WorkflowIdentity, + globalPortId: GlobalPortIdentity, + subdagHash: String, + fingerprintJson: String, + resultUri: java.net.URI, + tupleCount: Option[Long], + sourceExecutionId: Option[ExecutionIdentity] + ): Unit = { + val record = context.newRecord(OPERATOR_PORT_CACHE) + record.setWorkflowId(workflowId.id.toInt) + record.setGlobalPortId(globalPortId.serializeAsString) + record.setSubdagHash(subdagHash) + record.setFingerprintJson(fingerprintJson) + record.setResultUri(resultUri.toString) + tupleCount.foreach(c => record.setTupleCount(Long.box(c))) + sourceExecutionId.foreach(eid => record.setSourceExecutionId(Long.box(eid.id))) + + context + .insertInto(OPERATOR_PORT_CACHE) + .set(record) + .onConflict(OPERATOR_PORT_CACHE.WORKFLOW_ID, OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, OPERATOR_PORT_CACHE.SUBDAG_HASH) + .doUpdate() + .set(OPERATOR_PORT_CACHE.RESULT_URI, record.getResultUri) + .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, record.getFingerprintJson) + .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, record.getTupleCount) + .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, record.getSourceExecutionId) + .execute() + } + /** * This method is mainly used for frontend requests. Given a logicalOpId and an outputPortId of an execution, * this method finds the URI for a globalPortId that both: 1. matches the logicalOpId and outputPortId, and @@ -503,6 +537,23 @@ object WorkflowExecutionsResource { urisOfEid.find(isMatchingExternalPortURI) } + /** + * Lookup a result URI by executionId and physical port id. + */ + def getResultUriByPhysicalPortId( + eid: ExecutionIdentity, + globalPortId: GlobalPortIdentity + ): Option[URI] = { + context + .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI) + .from(OPERATOR_PORT_EXECUTIONS) + .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString)) + .fetchOptionalInto(classOf[String]) + .toScala + .map(URI.create) + } + case class WorkflowExecutionEntry( eId: Integer, vId: Integer, diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 01c66fb458..74244870d8 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -192,11 +192,11 @@ class WorkflowService( var controllerConf = ControllerConfig.default // clean up results from previous run - val previousExecutionId = - WorkflowExecutionService.getLatestExecutionId(workflowId, req.computingUnitId) - previousExecutionId.foreach(eid => { - clearExecutionResources(eid) - }) // TODO: change this behavior after enabling cache. +// val previousExecutionId = +// WorkflowExecutionService.getLatestExecutionId(workflowId, req.computingUnitId) +// previousExecutionId.foreach(eid => { +// clearExecutionResources(eid) +// }) // TODO: change this behavior after enabling cache. workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution( workflowContext.workflowId, diff --git a/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala new file mode 100644 index 0000000000..b85c06801e --- /dev/null +++ b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.engine.architecture.scheduling + +import org.apache.amber.core.workflow.{GlobalPortIdentity, PortIdentity, WorkflowContext} +import org.apache.amber.core.workflow.cache.FingerprintUtil +import org.apache.amber.engine.e2e.TestUtils.buildWorkflow +import org.apache.amber.operator.TestOperators +import org.apache.amber.operator.aggregate.{AggregateOpDesc, AggregationFunction} +import org.apache.amber.operator.keywordSearch.KeywordSearchOpDesc +import org.apache.amber.operator.source.scan.csv.CSVScanSourceOpDesc +import org.apache.texera.workflow.LogicalLink +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FingerprintUtilSpec extends AnyFlatSpec with Matchers { + + private def newCsv(): CSVScanSourceOpDesc = + TestOperators.headerlessSmallCsvScanOpDesc() + + private def newKeyword(pattern: String = "Asia", logicalId: Option[String] = None): KeywordSearchOpDesc = { + val op = TestOperators.keywordSearchOpDesc("column-1", pattern) + logicalId.foreach(id => op.setOperatorId(id)) + op + } + + private def newGroupBy(): AggregateOpDesc = + TestOperators.aggregateAndGroupByDesc("column-1", AggregationFunction.COUNT, List[String]()) + + private def buildSimpleWorkflow(csv: CSVScanSourceOpDesc, keyword: KeywordSearchOpDesc) = + buildWorkflow( + List(csv, keyword), + List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + keyword.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + + "FingerprintUtil" should "produce stable hash for identical workflows" in { + val csv = newCsv() + val keyword = newKeyword() + val workflow = buildSimpleWorkflow(csv, keyword) + val target = GlobalPortIdentity( + workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).head.id, + PortIdentity(0, internal = false) + ) + val f1 = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, target) + val f2 = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, target) + f1.subdagHash shouldEqual f2.subdagHash + f1.fingerprintJson shouldEqual f2.fingerprintJson + } + + it should "change hash when operator configuration changes" in { + val csv = newCsv() + val keyword = newKeyword() + val workflow = buildSimpleWorkflow(csv, keyword) + val target = GlobalPortIdentity( + workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).head.id, + PortIdentity(0, internal = false) + ) + // modify keyword search pattern by building a new operator desc + val modifiedKeyword = newKeyword("Europe", logicalId = Some(keyword.operatorIdentifier.id)) + val modifiedWorkflow = buildWorkflow( + List(csv, modifiedKeyword), + List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + modifiedKeyword.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + val f1 = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, target) + val f2 = FingerprintUtil.computeSubdagFingerprint(modifiedWorkflow.physicalPlan, target) + f1.subdagHash should not equal f2.subdagHash + } + + it should "change hash when wiring changes" in { + val csv = newCsv() + val keyword = newKeyword() + val groupBy = newGroupBy() + val workflow = buildWorkflow( + List(csv, keyword, groupBy), + List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(0), + keyword.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + keyword.operatorIdentifier, + PortIdentity(0), + groupBy.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + + val targetKeyword = GlobalPortIdentity( + workflow.physicalPlan.getPhysicalOpsOfLogicalOp(keyword.operatorIdentifier).head.id, + PortIdentity(0, internal = false) + ) + val targetGroupBy = GlobalPortIdentity( + workflow.physicalPlan.getPhysicalOpsOfLogicalOp(groupBy.operatorIdentifier).head.id, + PortIdentity(0, internal = false) + ) + + val fKeyword = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, targetKeyword) + val fGroupBy = FingerprintUtil.computeSubdagFingerprint(workflow.physicalPlan, targetGroupBy) + fKeyword.subdagHash should not equal fGroupBy.subdagHash + } +} diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/cache/FingerprintUtil.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/cache/FingerprintUtil.scala new file mode 100644 index 0000000000..0ec50917f5 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/workflow/cache/FingerprintUtil.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.core.workflow.cache + +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.node.ObjectNode +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.amber.core.executor.OpExecInitInfo +import org.apache.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp, PhysicalPlan} + +import java.nio.charset.StandardCharsets +import java.security.MessageDigest + +case class Fingerprint(fingerprintJson: String, subdagHash: String) + +/** + * Computes deterministic fingerprints for upstream subDAGs to back cache keys. + * + * The fingerprint payload captures: + * - target output port, + * - all upstream physical operators (sorted), + * - their exec init info (proto string), + * - their output schemas (string form when available), + * - all edges between those operators (sorted). + * + * The payload is serialized with ordered keys and hashed via SHA-256. Identical input + * plans produce identical hashes; any change in structure or config flips the hash. + */ +object FingerprintUtil { + + /** + * Compute a deterministic fingerprint of the upstream subDAG for the given output port. + * The payload is canonical (sorted keys) and hashed with SHA-256 to generate subdagHash. + */ + def computeSubdagFingerprint( + plan: PhysicalPlan, + target: GlobalPortIdentity + ): Fingerprint = { + val subdag = collectUpstream(plan, target) + val payload = buildCanonicalPayload(subdag.nodes, subdag.links, target) + val fingerprintJson = objectMapper.writeValueAsString(payload) + Fingerprint(fingerprintJson, sha256Hex(fingerprintJson)) + } + + private case class SubDag(nodes: Set[PhysicalOp], links: Set[PhysicalLink]) + + /** + * Collect all operators and links reachable upstream of the target port's operator. + * This is limited to the connected component feeding the target. + */ + private def collectUpstream(plan: PhysicalPlan, target: GlobalPortIdentity): SubDag = { + val visitedOps = scala.collection.mutable.Set(target.opId) + val visitedLinks = scala.collection.mutable.Set[PhysicalLink]() + val queue = scala.collection.mutable.Queue(target.opId) + + while (queue.nonEmpty) { + val current = queue.dequeue() + val upstreamLinks = plan.getUpstreamPhysicalLinks(current) + upstreamLinks.foreach(link => { + visitedLinks.add(link) + if (!visitedOps.contains(link.fromOpId)) { + visitedOps.add(link.fromOpId) + queue.enqueue(link.fromOpId) + } + }) + } + + SubDag( + nodes = visitedOps.map(plan.getOperator).toSet, + links = visitedLinks.toSet.filter(link => + visitedOps.contains(link.fromOpId) && visitedOps.contains(link.toOpId) + ) + ) + } + + /** + * Build a canonical JSON payload describing the subDAG: + * - target port + * - sorted nodes with exec info and schemas + * - sorted edges + */ + private def buildCanonicalPayload( + nodes: Set[PhysicalOp], + links: Set[PhysicalLink], + target: GlobalPortIdentity + ): ObjectNode = { + val root = objectMapper.createObjectNode() + root.put("targetPort", target.toString) + + val nodeArray = objectMapper.createArrayNode() + nodes.toList + .sortBy(_.id.toString) + .foreach(op => nodeArray.add(buildNode(op))) + root.set("nodes", nodeArray) + + val edgeArray = objectMapper.createArrayNode() + links.toList + .sortBy(link => (link.fromOpId.toString, link.fromPortId.id, link.toOpId.toString, link.toPortId.id)) + .foreach(link => edgeArray.add(buildEdge(link))) + root.set("edges", edgeArray) + + root + } + + /** + * Serialize a physical operator into a deterministic JSON node. + * Captures IDs, exec init info, and output schemas. + */ + private def buildNode(op: PhysicalOp): ObjectNode = { + val node = objectMapper.createObjectNode() + node.put("physicalOpId", op.id.toString) + node.put("logicalOpId", op.id.logicalOpId.toString) + node.set("opExec", serializeOpExec(op.opExecInitInfo)) + + val schemaArray = objectMapper.createArrayNode() + op.outputPorts.toList + .sortBy(_._1.id) + .foreach { + case (portId, (_, _, schemaEither)) => + val schemaNode = objectMapper.createObjectNode() + schemaNode.put("portId", portId.id) + schemaNode.put("internal", portId.internal) + schemaEither.toOption match { + case Some(schema) => + schemaNode.put("available", true) + schemaNode.put("schemaString", schema.toString) + case None => + schemaNode.put("available", false) + } + schemaArray.add(schemaNode) + } + node.set("outputSchemas", schemaArray) + + node + } + + /** + * Serialize a physical link into a deterministic JSON node. + */ + private def buildEdge(link: PhysicalLink): ObjectNode = { + val edge = objectMapper.createObjectNode() + edge.put("fromOpId", link.fromOpId.toString) + edge.put("fromPortId", link.fromPortId.id) + edge.put("fromInternal", link.fromPortId.internal) + edge.put("toOpId", link.toOpId.toString) + edge.put("toPortId", link.toPortId.id) + edge.put("toInternal", link.toPortId.internal) + edge + } + + private val objectMapper: ObjectMapper = { + val mapper = new ObjectMapper() + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS) + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) + mapper + } + + private def sha256Hex(value: String): String = { + val digest = MessageDigest.getInstance("SHA-256") + val bytes = digest.digest(value.getBytes(StandardCharsets.UTF_8)) + bytes.map("%02x".format(_)).mkString + } + + /** + * Serialize op init info deterministically using its proto string. + */ + private def serializeOpExec(opExecInitInfo: OpExecInitInfo): ObjectNode = { + val n = objectMapper.createObjectNode() + // Use canonical proto string for determinism. + n.put("protoString", opExecInitInfo.asMessage.toProtoString) + n + } +} diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql index 7b0f9b9063..a112413a4a 100644 --- a/sql/texera_ddl.sql +++ b/sql/texera_ddl.sql @@ -48,6 +48,7 @@ SET search_path TO texera_db, public; -- ============================================ DROP TABLE IF EXISTS operator_executions CASCADE; DROP TABLE IF EXISTS operator_port_executions CASCADE; +DROP TABLE IF EXISTS operator_port_cache CASCADE; DROP TABLE IF EXISTS workflow_user_access CASCADE; DROP TABLE IF EXISTS workflow_of_user CASCADE; DROP TABLE IF EXISTS user_config CASCADE; @@ -296,6 +297,21 @@ CREATE TABLE operator_port_executions FOREIGN KEY (workflow_execution_id) REFERENCES workflow_executions(eid) ON DELETE CASCADE ); +-- operator_port_cache +CREATE TABLE operator_port_cache +( + workflow_id INT NOT NULL, + global_port_id VARCHAR(200) NOT NULL, + subdag_hash CHAR(64) NOT NULL, + fingerprint_json TEXT NOT NULL, + result_uri TEXT NOT NULL, + tuple_count BIGINT, + source_execution_id BIGINT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (workflow_id, global_port_id, subdag_hash), + FOREIGN KEY (workflow_id) REFERENCES workflow(wid) ON DELETE CASCADE +); + -- workflow_user_likes CREATE TABLE IF NOT EXISTS workflow_user_likes ( diff --git a/sql/updates/16.sql b/sql/updates/16.sql new file mode 100644 index 0000000000..6276738d72 --- /dev/null +++ b/sql/updates/16.sql @@ -0,0 +1,45 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ============================================ +-- 1. Connect to the texera_db database +-- ============================================ +\c texera_db + +SET search_path TO texera_db; + +-- ============================================ +-- 2. Update the table schema +-- ============================================ +BEGIN; + +-- Cache table keyed by workflow + global port + subdag hash. +CREATE TABLE IF NOT EXISTS operator_port_cache +( + workflow_id INT NOT NULL, + global_port_id VARCHAR(200) NOT NULL, + subdag_hash CHAR(64) NOT NULL, + fingerprint_json TEXT NOT NULL, + result_uri TEXT NOT NULL, + tuple_count BIGINT, + source_execution_id BIGINT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (workflow_id, global_port_id, subdag_hash), + FOREIGN KEY (workflow_id) REFERENCES workflow(wid) ON DELETE CASCADE +); + +COMMIT;
