This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit 51624179eedbaa8d793ee245588637413aea6185 Author: Xiaozhen Liu <[email protected]> AuthorDate: Tue Jan 13 14:13:17 2026 -0800 feat(cache): clean up dead code. --- .../user/workflow/WorkflowExecutionsResource.scala | 69 ---------------------- 1 file changed, 69 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 470318d4f6..f0b661e524 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -475,43 +475,6 @@ object WorkflowExecutionsResource { } } - /** - * Upsert cache entry for an operator port. - */ - def upsertOperatorPortCache( - workflowId: WorkflowIdentity, - globalPortId: GlobalPortIdentity, - subdagHash: String, - fingerprintJson: String, - resultUri: java.net.URI, - tupleCount: Option[Long], - sourceExecutionId: Option[ExecutionIdentity] - ): Unit = { - val record = context.newRecord(OPERATOR_PORT_CACHE) - record.setWorkflowId(workflowId.id.toInt) - record.setGlobalPortId(globalPortId.serializeAsString) - record.setSubdagHash(subdagHash) - record.setFingerprintJson(fingerprintJson) - record.setResultUri(resultUri.toString) - tupleCount.foreach(c => record.setTupleCount(Long.box(c))) - sourceExecutionId.foreach(eid => record.setSourceExecutionId(Long.box(eid.id))) - - context - .insertInto(OPERATOR_PORT_CACHE) - .set(record) - .onConflict( - OPERATOR_PORT_CACHE.WORKFLOW_ID, - OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, - OPERATOR_PORT_CACHE.SUBDAG_HASH - ) - .doUpdate() - .set(OPERATOR_PORT_CACHE.RESULT_URI, record.getResultUri) - .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, record.getFingerprintJson) - .set(OPERATOR_PORT_CACHE.TUPLE_COUNT, record.getTupleCount) - .set(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID, record.getSourceExecutionId) - .execute() - } - /** * This method is mainly used for frontend requests. Given a logicalOpId and an outputPortId of an execution, * this method finds the URI for a globalPortId that both: 1. matches the logicalOpId and outputPortId, and @@ -563,38 +526,6 @@ object WorkflowExecutionsResource { .map(URI.create) } - /** - * Lookup a cache entry by workflow, port, and subdag hash. - */ - def getOperatorPortCache( - wid: WorkflowIdentity, - globalPortId: GlobalPortIdentity, - subdagHash: String - ): Option[(URI, Option[Long], String, Option[ExecutionIdentity])] = { - context - .select( - OPERATOR_PORT_CACHE.RESULT_URI, - OPERATOR_PORT_CACHE.TUPLE_COUNT, - OPERATOR_PORT_CACHE.FINGERPRINT_JSON, - OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID - ) - .from(OPERATOR_PORT_CACHE) - .where(OPERATOR_PORT_CACHE.WORKFLOW_ID.eq(wid.id.toInt)) - .and(OPERATOR_PORT_CACHE.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString)) - .and(OPERATOR_PORT_CACHE.SUBDAG_HASH.eq(subdagHash)) - .fetchOptional() - .toScala - .map { record => - val uri = URI.create(record.get(OPERATOR_PORT_CACHE.RESULT_URI)) - val tupleCount = Option(record.get(OPERATOR_PORT_CACHE.TUPLE_COUNT)).map(_.longValue()) - val fp = record.get(OPERATOR_PORT_CACHE.FINGERPRINT_JSON) - val sourceEid = - Option(record.get(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID)) - .map(id => ExecutionIdentity(id.longValue())) - (uri, tupleCount, fp, sourceEid) - } - } - case class WorkflowExecutionEntry( eId: Integer, vId: Integer,
