This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit acaf348c5fb099decc4c476b5c137378bccf99ad Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jan 12 19:27:19 2026 -0800 feat(cache): refactor amber client-side services and cache upsert logic. --- .../architecture/controller/ClientEvent.scala | 6 + .../promisehandlers/PortCompletedHandler.scala | 32 +- .../texera/web/dao/OperatorPortCacheDao.scala | 147 +++++++++ .../texera/web/service/ExecutionCacheService.scala | 70 ++++ .../web/service/OperatorPortCacheService.scala | 144 ++++++++ .../web/service/WorkflowExecutionService.scala | 27 +- .../texera/web/service/WorkflowService.scala | 7 + docs/operator-port-cache.md | 363 +++++---------------- 8 files changed, 469 insertions(+), 327 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala index ea83eedd2a..9848ba5b0a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala @@ -46,3 +46,9 @@ case class UpdateExecutorCompleted(id: ActorVirtualIdentity) extends ClientEvent final case class ReplayStatusUpdate(id: ActorVirtualIdentity, status: Boolean) extends ClientEvent final case class WorkflowRecoveryStatus(isRecovering: Boolean) extends ClientEvent + +case class PortMaterialized( + portId: org.apache.texera.amber.core.workflow.GlobalPortIdentity, + resultUri: java.net.URI, + tupleCount: Option[Long] +) extends ClientEvent diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala index a2c7894bc2..a613c7cef9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala @@ -23,8 +23,7 @@ import com.twitter.util.Future import org.apache.texera.amber.core.WorkflowRuntimeException import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.workflow.GlobalPortIdentity -import org.apache.texera.amber.core.workflow.cache.FingerprintUtil -import org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer, FatalError} +import org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer, FatalError, PortMaterialized} import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, PortCompletedRequest, QueryStatisticsRequest} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER @@ -70,32 +69,21 @@ trait PortCompletedHandler { else operatorExecution.isOutputPortCompleted(msg.portId) if (isPortCompleted) { - // If this is an output port and materialized, persist cache metadata. + // If this is an output port and materialized, notify client (for cache upsert). if (!msg.input) { val storageUriOpt = WorkflowExecutionsResource.getResultUriByPhysicalPortId( cp.workflowContext.executionId, globalPortId ) - (storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) match { - case (Some(uri), Some(plan)) => - val fingerprint = FingerprintUtil.computeSubdagFingerprint(plan, globalPortId) - val tupleCount = - try { - Some(DocumentFactory.openDocument(uri)._1.getCount) - } catch { - case _: Throwable => None - } - WorkflowExecutionsResource.upsertOperatorPortCache( - cp.workflowContext.workflowId, - globalPortId, - fingerprint.subdagHash, - fingerprint.fingerprintJson, - uri, - tupleCount, - Some(cp.workflowContext.executionId) - ) - case _ => // no-op if plan or URI is missing + storageUriOpt.foreach { uri => + val tupleCount = + try { + Some(DocumentFactory.openDocument(uri)._1.getCount) + } catch { + case _: Throwable => None + } + sendToClient(PortMaterialized(globalPortId, uri, tupleCount)) } } diff --git a/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala new file mode 100644 index 0000000000..f6bf30efb2 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.dao + +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE +import org.jooq.DSLContext + +import java.net.URI +import scala.jdk.OptionConverters._ + +/** + * Record representing a cache entry in the operator_port_cache table. + * + * @param workflowId Workflow ID + * @param globalPortId Serialized GlobalPortIdentity + * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint + * @param fingerprintJson Canonical JSON of the upstream subDAG + * @param resultUri URI of the materialized output + * @param tupleCount Number of tuples in the cached output (optional) + * @param sourceExecutionId Execution ID that produced this cache entry (optional) + * + * Note: updated_at timestamp is managed by the database (DEFAULT now()) + */ +case class OperatorPortCacheRecord( + workflowId: Long, + globalPortId: String, + subdagHash: String, + fingerprintJson: String, + resultUri: URI, + tupleCount: Option[Long], + sourceExecutionId: Option[Long] +) + +/** + * Data Access Object for operator_port_cache table. + * Provides low-level CRUD operations using Jooq. + * + * @param sqlServer SqlServer instance for database access + */ +class OperatorPortCacheDao(sqlServer: SqlServer) { + private val context: DSLContext = sqlServer.createDSLContext() + + /** + * Retrieve a cache entry by primary key (workflow_id, global_port_id, subdag_hash). + * + * @param workflowId Workflow ID + * @param serializedPortId Serialized GlobalPortIdentity string + * @param subdagHash SHA-256 hash of the upstream subDAG fingerprint + * @return Some(OperatorPortCacheRecord) if found, None otherwise + */ + def get( + workflowId: Long, + serializedPortId: String, + subdagHash: String + ): Option[OperatorPortCacheRecord] = { + context + .select( + OPERATOR_PORT_CACHE.WORKFLOW_ID, + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, + OPERATOR_PORT_CACHE.SUBDAG_HASH, + OPERATOR_PORT_CACHE.FINGERPRINT_JSON, + OPERATOR_PORT_CACHE.RESULT_URI, + OPERATOR_PORT_CACHE.TUPLE_COUNT, + OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID + ) + .from(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) + .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(serializedPortId)) + .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash)) + .fetchOptional() + .toScala + .map { record => + OperatorPortCacheRecord( + workflowId = record.value1().longValue(), + globalPortId = record.value2(), + subdagHash = record.value3(), + fingerprintJson = record.value4(), + resultUri = URI.create(record.value5()), + tupleCount = Option(record.value6()).map(_.longValue()), + sourceExecutionId = Option(record.value7()).map(_.longValue()) + ) + } + } + + /** + * Insert or update a cache entry (upsert). + * On conflict (workflow_id, global_port_id, subdag_hash), updates the existing record. + * + * @param record OperatorPortCacheRecord to insert/update + */ + def upsert(record: OperatorPortCacheRecord): Unit = { + val dbRecord = context.newRecord(OPERATOR_PORT_CACHE) + dbRecord.setWorkflowId(record.workflowId.toInt) + dbRecord.setGlobalPortId(record.globalPortId) + dbRecord.setSubdagHash(record.subdagHash) + dbRecord.setFingerprintJson(record.fingerprintJson) + dbRecord.setResultUri(record.resultUri.toString) + record.tupleCount.foreach(c => dbRecord.setTupleCount(Long.box(c))) + record.sourceExecutionId.foreach(eid => dbRecord.setSourceExecutionId(Long.box(eid))) + + context + .insertInto(OPERATOR_PORT_CACHE) + .set(dbRecord) + .onConflict( + OPERATOR_PORT_CACHE.WORKFLOW_ID, + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, + OPERATOR_PORT_CACHE.SUBDAG_HASH + ) + .doUpdate() + .set(OPERATOR_PORT_CACHE.RESULT_URI, dbRecord.getResultUri) + .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson) + .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount) + .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, dbRecord.getSourceExecutionId) + .execute() + } + + /** + * Delete all cache entries for a specific workflow. + * Useful for cache invalidation when a workflow is deleted or manually cleared. + * + * @param workflowId Workflow ID whose cache entries should be deleted + */ + def deleteByWorkflow(workflowId: Long): Unit = { + context + .deleteFrom(OPERATOR_PORT_CACHE) + .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) + .execute() + } +} diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala new file mode 100644 index 0000000000..ee5ceeafe9 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext} +import org.apache.texera.amber.engine.architecture.controller.PortMaterialized +import org.apache.texera.amber.engine.common.client.AmberClient +import org.apache.texera.web.SubscriptionManager + +/** + * Service that listens for port materialization events from the controller + * and persists cache metadata. + * + * @param client AmberClient to register callbacks + * @param cacheService OperatorPortCacheService for cache persistence + * @param workflowContext WorkflowContext for workflow/execution IDs + * @param physicalPlan PhysicalPlan for fingerprint computation + */ +class ExecutionCacheService( + client: AmberClient, + cacheService: OperatorPortCacheService, + workflowContext: WorkflowContext, + physicalPlan: PhysicalPlan +) extends SubscriptionManager + with LazyLogging { + + registerCallbacks() + + private def registerCallbacks(): Unit = { + addSubscription( + client + .registerCallback[PortMaterialized]((evt: PortMaterialized) => { + logger.info( + s"Port materialized: ${evt.portId}, URI: ${evt.resultUri}, tuple count: ${evt.tupleCount}" + ) + try { + cacheService.upsertCachedOutput( + workflowContext.workflowId, + workflowContext.executionId, + evt.portId, + physicalPlan, + evt.resultUri, + evt.tupleCount + ) + } catch { + case e: Throwable => + logger.error(s"Failed to upsert cache for port ${evt.portId}", e) + } + }) + ) + } +} diff --git a/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala new file mode 100644 index 0000000000..9e0155e3a4 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.cache.FingerprintUtil +import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity, PhysicalPlan} +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import org.apache.texera.web.dao.{OperatorPortCacheDao, OperatorPortCacheRecord} + +import java.net.URI + +/** + * Service for operator port result caching. + * Provides high-level cache operations with business logic for workflow execution. + * + * Key responsibilities: + * - Batch lookup of cached outputs at workflow submission time + * - Cache entry creation when output ports complete + * - Fingerprint computation and serialization + * - Cache invalidation and lifecycle management + * + * @param dao OperatorPortCacheDao for database access + */ +class OperatorPortCacheService(dao: OperatorPortCacheDao) { + + /** + * Lookup cached outputs for all materializable ports in the physical plan. + * Called at workflow submission time by WorkflowExecutionService. + * + * For each output port in the plan: + * 1. Compute fingerprint of upstream subDAG + * 2. Query cache by (workflow_id, port_id, fingerprint_hash) + * 3. Collect all cache hits + * + * @param workflowId Workflow ID to lookup cache for + * @param physicalPlan Physical plan containing operators and ports + * @return Map from GlobalPortIdentity to CachedOutput for all cache hits + */ + def lookupCachedOutputs( + workflowId: WorkflowIdentity, + physicalPlan: PhysicalPlan + ): Map[GlobalPortIdentity, CachedOutput] = { + physicalPlan.operators + .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, pid))) + .flatMap { gpid => + val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid) + dao.get(workflowId.id, gpid.serializeAsString, fingerprint.subdagHash).map { record => + gpid -> CachedOutput( + resultUri = record.resultUri, + fingerprintJson = record.fingerprintJson, + tupleCount = record.tupleCount, + sourceExecutionId = record.sourceExecutionId.map(ExecutionIdentity(_)) + ) + } + } + .toMap + } + + /** + * Upsert cache entry when an output port completes. + * Called by PortCompletedHandler at runtime when a materialized output is produced. + * + * Steps: + * 1. Compute fingerprint of upstream subDAG + * 2. Upsert to operator_port_cache table with fingerprint, URI, metadata + * + * @param workflowId Workflow ID + * @param executionId Execution ID that produced this output + * @param portId GlobalPortIdentity of the completed port + * @param physicalPlan Physical plan (needed for fingerprint computation) + * @param resultUri URI where the materialized output is stored + * @param tupleCount Number of tuples in the output (optional, best-effort) + */ + def upsertCachedOutput( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity, + portId: GlobalPortIdentity, + physicalPlan: PhysicalPlan, + resultUri: URI, + tupleCount: Option[Long] + ): Unit = { + val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId) + + dao.upsert( + OperatorPortCacheRecord( + workflowId = workflowId.id, + globalPortId = portId.serializeAsString, + subdagHash = fingerprint.subdagHash, + fingerprintJson = fingerprint.fingerprintJson, + resultUri = resultUri, + tupleCount = tupleCount, + sourceExecutionId = Some(executionId.id) + ) + ) + } + + /** + * Invalidate all cache entries for a workflow. + * Useful for: + * - Manual cache clearing via REST API + * - Workflow deletion (cleanup) + * - Testing + * + * @param workflowId Workflow ID whose cache entries should be deleted + */ + def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = { + dao.deleteByWorkflow(workflowId.id) + } + + /** + * Future: Cost-aware eviction when storage quota is exceeded. + * Phase 3: Lifecycle management research. + * + * Proposed approach: + * - Calculate recompute_cost / storage_cost ratio for each cache entry + * - Evict entries with lowest ratio first + * - Use runtime_statistics table for cost estimation + * + * @param quotaBytes Storage quota in bytes + */ + def evictLowValueEntries(quotaBytes: Long): Unit = { + throw new UnsupportedOperationException( + "Cost-aware eviction not yet implemented (Phase 3: Lifecycle management)" + ) + } +} diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index ade57d403b..38eef83ad6 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -28,14 +28,9 @@ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAg import org.apache.texera.amber.engine.common.Utils import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore -import org.apache.texera.amber.core.workflow.cache.FingerprintUtil import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity} import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps -import org.apache.texera.web.model.websocket.event.{ - TexeraWebSocketEvent, - WorkflowErrorEvent, - WorkflowStateEvent -} +import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, WorkflowErrorEvent, WorkflowStateEvent} import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import org.apache.texera.web.storage.ExecutionStateStore @@ -61,6 +56,7 @@ class WorkflowExecutionService( controllerConfig: ControllerConfig, val workflowContext: WorkflowContext, resultService: ExecutionResultService, + cacheService: OperatorPortCacheService, request: WorkflowExecuteRequest, val executionStateStore: ExecutionStateStore, errorHandler: Throwable => Unit, @@ -105,22 +101,14 @@ class WorkflowExecutionService( var executionStatsService: ExecutionStatsService = _ var executionRuntimeService: ExecutionRuntimeService = _ var executionConsoleService: ExecutionConsoleService = _ + var executionCacheService: ExecutionCacheService = _ private def computeCachedOutputs( physicalPlan: org.apache.texera.amber.core.workflow.PhysicalPlan ): Map[String, CachedOutput] = { - physicalPlan.operators - .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, pid))) - .flatMap { gpid => - val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid) - WorkflowExecutionsResource - .getOperatorPortCache(workflowContext.workflowId, gpid, fingerprint.subdagHash) - .map { - case (uri, tupleCount, fpJson, sourceEid) => - gpid.serializeAsString -> CachedOutput(uri, fpJson, tupleCount, sourceEid) - } - } - .toMap + cacheService + .lookupCachedOutputs(workflowContext.workflowId, physicalPlan) + .map { case (gpid, cached) => gpid.serializeAsString -> cached } } def executeWorkflow(): Unit = { @@ -144,6 +132,8 @@ class WorkflowExecutionService( executionReconfigurationService = new ExecutionReconfigurationService(client, executionStateStore, workflow) executionStatsService = new ExecutionStatsService(client, executionStateStore, workflow.context) + executionCacheService = + new ExecutionCacheService(client, cacheService, workflow.context, workflow.physicalPlan) executionRuntimeService = new ExecutionRuntimeService( client, executionStateStore, @@ -196,6 +186,7 @@ class WorkflowExecutionService( executionRuntimeService.unsubscribeAll() executionConsoleService.unsubscribeAll() executionStatsService.unsubscribeAll() + executionCacheService.unsubscribeAll() executionReconfigurationService.unsubscribeAll() } diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index e35a707aeb..1756ff8032 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -107,6 +107,12 @@ class WorkflowService( val resultService: ExecutionResultService = new ExecutionResultService(workflowId, computingUnitId, stateStore) + val cacheService: OperatorPortCacheService = { + val dao = new org.apache.texera.web.dao.OperatorPortCacheDao( + org.apache.texera.dao.SqlServer.getInstance() + ) + new OperatorPortCacheService(dao) + } val lifeCycleManager: WorkflowLifecycleManager = new WorkflowLifecycleManager( s"workflowId=$workflowId", cleanUpTimeout, @@ -275,6 +281,7 @@ class WorkflowService( controllerConf, workflowContext, resultService, + cacheService, req, executionStateStore, errorHandler, diff --git a/docs/operator-port-cache.md b/docs/operator-port-cache.md index cc1d760437..f77fe2653c 100644 --- a/docs/operator-port-cache.md +++ b/docs/operator-port-cache.md @@ -51,9 +51,10 @@ V1 assumes unlimited storage. Eviction, TTL, and garbage collection are research - `fingerprint_json`: canonical JSON of the upstream sub‑DAG. - `subdag_hash`: SHA-256 of `fingerprint_json`. - `result_uri`: materialization URI. - - `tuple_count` (optional), `source_execution_id` (optional), timestamps. + - `tuple_count` (optional), `source_execution_id` (optional). + - `updated_at`: TIMESTAMPTZ managed by database (DEFAULT now()). - `global_port_id` uses existing `GlobalPortIdentity` serialization. -- Status: schema + migration added (`sql/texera_ddl.sql`, `sql/updates/16.sql`). +- Status: schema + migration added (`sql/updates/cache.sql`). ## Fingerprint - Utility: `FingerprintUtil.computeSubdagFingerprint(physicalPlan, globalPortId) -> (fingerprintJson, subdagHash)`. @@ -122,18 +123,24 @@ Entry point: `RegionExecutionCoordinator` constructor branches on `region.cached 7. **Set phase to Completed**: Region lifecycle completes immediately #### ToExecute Regions (Normal Execution) -**Location**: `PortCompletedHandler` → `OperatorPortCacheService` → `OperatorPortCacheDao` +**Location**: `PortCompletedHandler` → `PortMaterialized event` → `ExecutionCacheService` → `OperatorPortCacheService` → `OperatorPortCacheDao` 1. **Execute operators**: Normal execution path via worker actors 2. **On output port completion** (`PortCompletedHandler`): - - Call `OperatorPortCacheService.upsertCachedOutput(...)`: - - Compute fingerprint via `FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId)` - - Retrieve tuple count (best-effort via `DocumentFactory.openDocument(uri).getCount`) - - Upsert to `operator_port_cache` table via DAO with: + - Retrieve result URI from `WorkflowExecutionsResource.getResultUriByPhysicalPortId` + - Retrieve tuple count (best-effort via `DocumentFactory.openDocument(uri).getCount`) + - Send `PortMaterialized(portId, resultUri, tupleCount)` event to client via `sendToClient()` +3. **Service layer** (`ExecutionCacheService`): + - Registered callback via `client.registerCallback[PortMaterialized]` receives event + - Calls `OperatorPortCacheService.upsertCachedOutput(...)`: + - Computes fingerprint via `FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId)` + - Upserts to `operator_port_cache` table via DAO with: - `workflow_id`, `global_port_id`, `subdag_hash` - `fingerprint_json`, `result_uri` - `tuple_count`, `source_execution_id`, timestamps -3. **Normal stats emission**: Real execution metrics sent to client +4. **Normal stats emission**: Real execution metrics sent to client + +**Architecture note**: Event-based communication follows existing controller pattern - handler emits events via `sendToClient()`, service layer registers callbacks to handle them. Clean separation: engine layer knows nothing about web/service layer. ### 4. Client-Side State Management **Location**: `ExecutionStatsService`, `ExecutionStateStore` @@ -218,260 +225,25 @@ HTTP endpoints for external access (if needed): **Note**: Internal services use `OperatorPortCacheService`, not the REST resource. -### 6. Implementation Guide +### 6. Implemented Components Reference -#### Step 1: Create OperatorPortCacheDao -**File**: `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` +Phase 1.1 Service/DAO architecture is complete. Key components: -```scala -package org.apache.texera.web.dao - -import org.apache.texera.amber.core.virtualidentity.ExecutionIdentity -import org.apache.texera.dao.SqlServer -import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_PORT_CACHE -import org.jooq.DSLContext - -import java.net.URI -import scala.jdk.OptionConverters._ - -case class OperatorPortCacheRecord( - workflowId: Long, - globalPortId: String, - subdagHash: String, - fingerprintJson: String, - resultUri: URI, - tupleCount: Option[Long], - sourceExecutionId: Option[Long], - createdAt: Long, - updatedAt: Long -) +| Component | Location | Purpose | +|-----------|----------|---------| +| **OperatorPortCacheDao** | `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` | Low-level database access using Jooq. Methods: `get()`, `upsert()`, `deleteByWorkflow()` | +| **OperatorPortCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` | High-level cache operations. Methods: `lookupCachedOutputs()`, `upsertCachedOutput()`, `invalidateWorkflowCache()` | +| **ExecutionCacheService** | `/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala` | Event listener that registers callback for `PortMaterialized` events and bridges to service layer | +| **PortMaterialized Event** | `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala` | Client event emitted when output port completes with URI and tuple count | -class OperatorPortCacheDao(sqlServer: SqlServer) { - private val context: DSLContext = sqlServer.createDSLContext() +**Integration Points**: +- **WorkflowService**: Instantiates `cacheService` at workflow level (shared across executions) +- **WorkflowExecutionService**: + - Uses `cacheService.lookupCachedOutputs()` at submission time + - Instantiates `executionCacheService` per execution for cache writes +- **PortCompletedHandler**: Emits `PortMaterialized` event via `sendToClient()` when output ports complete - def get( - workflowId: Long, - serializedPortId: String, - subdagHash: String - ): Option[OperatorPortCacheRecord] = { - context - .select( - OPERATOR_PORT_CACHE.WORKFLOW_ID, - OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, - OPERATOR_PORT_CACHE.SUBDAG_HASH, - OPERATOR_PORT_CACHE.FINGERPRINT_JSON, - OPERATOR_PORT_CACHE.RESULT_URI, - OPERATOR_PORT_CACHE.TUPLE_COUNT, - OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, - OPERATOR_PORT_CACHE.CREATED_AT, - OPERATOR_PORT_CACHE.UPDATED_AT - ) - .from(OPERATOR_PORT_CACHE) - .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) - .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(serializedPortId)) - .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash)) - .fetchOptional() - .toScala - .map { record => - OperatorPortCacheRecord( - workflowId = record.value1().longValue(), - globalPortId = record.value2(), - subdagHash = record.value3(), - fingerprintJson = record.value4(), - resultUri = URI.create(record.value5()), - tupleCount = Option(record.value6()).map(_.longValue()), - sourceExecutionId = Option(record.value7()).map(_.longValue()), - createdAt = record.value8().longValue(), - updatedAt = record.value9().longValue() - ) - } - } - - def upsert(record: OperatorPortCacheRecord): Unit = { - val dbRecord = context.newRecord(OPERATOR_PORT_CACHE) - dbRecord.setWorkflowId(record.workflowId.toInt) - dbRecord.setGlobalPortId(record.globalPortId) - dbRecord.setSubdagHash(record.subdagHash) - dbRecord.setFingerprintJson(record.fingerprintJson) - dbRecord.setResultUri(record.resultUri.toString) - record.tupleCount.foreach(c => dbRecord.setTupleCount(Long.box(c))) - record.sourceExecutionId.foreach(eid => dbRecord.setSourceExecutionId(Long.box(eid))) - - context - .insertInto(OPERATOR_PORT_CACHE) - .set(dbRecord) - .onConflict( - OPERATOR_PORT_CACHE.WORKFLOW_ID, - OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, - OPERATOR_PORT_CACHE.SUBDAG_HASH - ) - .doUpdate() - .set(OPERATOR_PORT_CACHE.RESULT_URI, dbRecord.getResultUri) - .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, dbRecord.getFingerprintJson) - .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, dbRecord.getTupleCount) - .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, dbRecord.getSourceExecutionId) - .execute() - } - - def deleteByWorkflow(workflowId: Long): Unit = { - context - .deleteFrom(OPERATOR_PORT_CACHE) - .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(workflowId.toInt)) - .execute() - } -} -``` - -#### Step 2: Create OperatorPortCacheService -**File**: `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` - -```scala -package org.apache.texera.web.service - -import org.apache.texera.amber.core.storage.DocumentFactory -import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import org.apache.texera.amber.core.workflow.cache.FingerprintUtil -import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity, PhysicalPlan} -import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps -import org.apache.texera.web.dao.{OperatorPortCacheDao, OperatorPortCacheRecord} - -import java.net.URI - -class OperatorPortCacheService(dao: OperatorPortCacheDao) { - - /** - * Lookup cached outputs for all materializable ports in the physical plan. - * Called at workflow submission time. - */ - def lookupCachedOutputs( - workflowId: WorkflowIdentity, - physicalPlan: PhysicalPlan - ): Map[GlobalPortIdentity, CachedOutput] = { - physicalPlan.operators - .flatMap(op => op.outputPorts.keys.map(pid => GlobalPortIdentity(op.id, pid))) - .flatMap { gpid => - val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, gpid) - dao.get(workflowId.id, gpid.serializeAsString, fingerprint.subdagHash).map { record => - gpid -> CachedOutput( - resultUri = record.resultUri, - fingerprintJson = record.fingerprintJson, - tupleCount = record.tupleCount, - sourceExecutionId = record.sourceExecutionId.map(ExecutionIdentity(_)) - ) - } - } - .toMap - } - - /** - * Upsert cache entry when an output port completes. - * Called by PortCompletedHandler at runtime. - */ - def upsertCachedOutput( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity, - portId: GlobalPortIdentity, - physicalPlan: PhysicalPlan, - resultUri: URI, - tupleCount: Option[Long] - ): Unit = { - val fingerprint = FingerprintUtil.computeSubdagFingerprint(physicalPlan, portId) - val now = System.currentTimeMillis() - - dao.upsert( - OperatorPortCacheRecord( - workflowId = workflowId.id, - globalPortId = portId.serializeAsString, - subdagHash = fingerprint.subdagHash, - fingerprintJson = fingerprint.fingerprintJson, - resultUri = resultUri, - tupleCount = tupleCount, - sourceExecutionId = Some(executionId.id), - createdAt = now, - updatedAt = now - ) - ) - } - - /** - * Invalidate all cache entries for a workflow. - * Useful for manual cache clearing or workflow deletion. - */ - def invalidateWorkflowCache(workflowId: WorkflowIdentity): Unit = { - dao.deleteByWorkflow(workflowId.id) - } - - /** - * Future: Cost-aware eviction when storage quota is exceeded. - */ - def evictLowValueEntries(quotaBytes: Long): Unit = { - // Phase 3: Lifecycle management - throw new UnsupportedOperationException("Not yet implemented") - } -} -``` - -#### Step 3: Update WorkflowExecutionService -**File**: `/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala` - -```scala -class WorkflowExecutionService( - controllerConfig: ControllerConfig, - val workflowContext: WorkflowContext, - resultService: ExecutionResultService, - cacheService: OperatorPortCacheService, // INJECT HERE - request: WorkflowExecuteRequest, - val executionStateStore: ExecutionStateStore, - errorHandler: Throwable => Unit, - userEmailOpt: Option[String], - sessionUri: URI -) extends SubscriptionManager with LazyLogging { - - def executeWorkflow(): Unit = { - try { - workflow = new WorkflowCompiler(workflowContext).compile(request.logicalPlan) - - // Use cache service for lookup - val cachedOutputs = cacheService - .lookupCachedOutputs(workflowContext.workflowId, workflow.physicalPlan) - .map { case (gpid, cached) => gpid.serializeAsString -> cached } - - workflowContext.workflowSettings = - workflowContext.workflowSettings.copy(cachedOutputs = cachedOutputs) - } catch { - case err: Throwable => errorHandler(err) - } - // ... rest of method - } -} -``` - -#### Step 4: Update PortCompletedHandler -**File**: `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala` - -```scala -// Inject OperatorPortCacheService into controller/handler - -(storageUriOpt, Option(cp.workflowScheduler.physicalPlan)) match { - case (Some(uri), Some(plan)) => - val tupleCount = try { - Some(DocumentFactory.openDocument(uri)._1.getCount) - } catch { - case _: Throwable => None - } - - // Use cache service for upsert - cacheService.upsertCachedOutput( - cp.workflowContext.workflowId, - cp.workflowContext.executionId, - globalPortId, - plan, - uri, - tupleCount - ) - case _ => // no-op -} -``` +**Architecture**: Event-based communication follows existing patterns (ExecutionStatsUpdate, WorkerAssignmentUpdate). Engine layer has zero knowledge of web/service layer. ### 7. Testing Strategy - **Unit tests**: Fingerprint determinism, cost model logic, region classification @@ -483,33 +255,37 @@ class WorkflowExecutionService( ### Architecture Layers -**Current (Prototype)**: +**Clean Architecture (Implemented)**: ``` -WorkflowExecutionService ───┐ - ├──→ WorkflowExecutionsResource (Jooq) -PortCompletedHandler ───────┘ +WorkflowExecutionService ──→ lookupCachedOutputs() + ↓ +ExecutionCacheService ────→ upsertCachedOutput() OperatorPortCacheService + ↑ ↓ ↓ + └─ registerCallback() ──────┘ OperatorPortCacheDao (Jooq) + (PortMaterialized event) ↓ + operator_port_cache table ``` -**Proposed (Clean Architecture)**: -``` -WorkflowExecutionService ───┐ - ├──→ OperatorPortCacheService ──→ OperatorPortCacheDao (Jooq) -PortCompletedHandler ───────┤ - │ -WorkflowExecutionsResource ─┘ (optional REST endpoints) -``` +**Event-based communication flow**: +1. `PortCompletedHandler` emits `PortMaterialized` event via `sendToClient()` +2. `ExecutionCacheService` registers callback via `client.registerCallback[PortMaterialized]` +3. Callback invokes `OperatorPortCacheService.upsertCachedOutput()` +4. Service calls `OperatorPortCacheDao.upsert()` for database persistence -**Refactoring needed**: -1. Extract Jooq code from `WorkflowExecutionsResource` → `OperatorPortCacheDao` -2. Create `OperatorPortCacheService` with workflow-level abstractions -3. Update service call sites to use `OperatorPortCacheService` +**Clean layering**: Engine layer (PortCompletedHandler) has zero knowledge of web/service layer. Event-based pattern matches existing controller communication (ExecutionStatsUpdate, WorkerAssignmentUpdate, etc.). ### Completed Components -- **Schema/migration**: `operator_port_cache` table added (`sql/texera_ddl.sql`, `sql/updates/16.sql`) +- **Schema/migration**: `operator_port_cache` table added (`sql/updates/cache.sql`) + - Columns: `workflow_id`, `global_port_id`, `subdag_hash` (PK), `fingerprint_json`, `result_uri`, `tuple_count`, `source_execution_id`, `updated_at` + - Timestamp managed by database (`DEFAULT now()`) +- **Service/DAO architecture** (Phase 1.1 - Complete): + - `OperatorPortCacheDao`: Low-level database access with get/upsert/delete methods + - `OperatorPortCacheService`: High-level cache operations (lookupCachedOutputs, upsertCachedOutput, invalidateWorkflowCache) + - `ExecutionCacheService`: Event listener bridging controller events to service layer + - Event-based communication via `PortMaterialized` event and `client.registerCallback[T]` - **Fingerprinting**: `FingerprintUtil` implemented with workflow-based specs for deterministic subDAG hashing -- **Submission-time lookup**: `WorkflowExecutionService` computes fingerprints for all physical output ports, queries cache, stores hits in `WorkflowSettings.cachedOutputs` -- **Cache persistence**: `PortCompletedHandler` upserts to `operator_port_cache` on output port completion (includes fingerprint, URI, tuple count) -- **API layer**: `WorkflowExecutionsResource` exposes cache lookup/upsert helpers +- **Submission-time lookup**: `WorkflowExecutionService` uses `OperatorPortCacheService.lookupCachedOutputs()` to compute fingerprints for all physical output ports, queries cache, stores hits in `WorkflowSettings.cachedOutputs` +- **Cache persistence**: `PortCompletedHandler` emits `PortMaterialized` event → `ExecutionCacheService` → `OperatorPortCacheService.upsertCachedOutput()` → `OperatorPortCacheDao.upsert()` (includes fingerprint, URI, tuple count) - **Scheduler integration**: `CostBasedScheduleGenerator` marks regions cached when all required outputs have hits, reuses cached URIs in port configs - **Runtime execution**: `RegionExecutionCoordinator` branches on `region.cached` flag: - ToSkip regions: `completeCachedRegion()` creates shallow state hierarchy, emits synthetic stats (numWorkers=0, processingTime=0), propagates cached URIs downstream @@ -586,18 +362,31 @@ The cache system integrates with three layers: ### Phase 1: Complete Prototype (Engineering) -#### 1.1 Refactor to Service/DAO Architecture -- [ ] Create `OperatorPortCacheDao` with get/upsert/delete methods - - Extract Jooq code from `WorkflowExecutionsResource` - - Define `OperatorPortCacheRecord` case class - - Add unit tests for DAO operations -- [ ] Create `OperatorPortCacheService` with high-level methods +#### 1.1 Refactor to Service/DAO Architecture ✓ COMPLETE +- [x] Create `OperatorPortCacheDao` with get/upsert/delete methods + - Extracted Jooq code into dedicated DAO layer + - Defined `OperatorPortCacheRecord` case class matching database schema + - Location: `/amber/src/main/scala/org/apache/texera/web/dao/OperatorPortCacheDao.scala` +- [x] Create `OperatorPortCacheService` with high-level methods - `lookupCachedOutputs(workflowId, physicalPlan)`: batch lookup at submission - `upsertCachedOutput(...)`: cache write on port completion - `invalidateWorkflowCache(workflowId)`: manual invalidation - - Encapsulate fingerprint computation and serialization -- [ ] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service -- [ ] Refactor `PortCompletedHandler` to use service + - Encapsulates fingerprint computation and serialization + - Location: `/amber/src/main/scala/org/apache/texera/web/service/OperatorPortCacheService.scala` +- [x] Create `ExecutionCacheService` for event handling + - Registers callback for `PortMaterialized` events + - Bridges controller events to service layer + - Location: `/amber/src/main/scala/org/apache/texera/web/service/ExecutionCacheService.scala` +- [x] Add `PortMaterialized` event type + - Location: `/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala` +- [x] Refactor `WorkflowExecutionService.computeCachedOutputs()` to use service + - Uses `OperatorPortCacheService.lookupCachedOutputs()` +- [x] Refactor `PortCompletedHandler` to emit events + - Emits `PortMaterialized` event via `sendToClient()` instead of direct service calls +- [x] Instantiate services in `WorkflowService` and `WorkflowExecutionService` + - `cacheService` created at workflow level + - `executionCacheService` created per execution +- [ ] Add unit tests for DAO operations - [ ] (Optional) Add REST endpoints in `WorkflowExecutionsResource` that delegate to service #### 1.2 Testing & Validation
