This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype
in repository https://gitbox.apache.org/repos/asf/texera.git

commit c08e919d71255d88e273d4edf9a893385c4cf102
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Mon Jan 12 20:37:54 2026 -0800

    feat(cache): fmt.
---
 .../promisehandlers/PortCompletedHandler.scala     | 12 +++-
 .../scheduling/CostBasedScheduleGenerator.scala    | 13 ++--
 .../architecture/scheduling/CostEstimator.scala    |  7 ++-
 .../scheduling/RegionExecutionCoordinator.scala    | 69 ++++++++++++++++------
 .../scheduling/WorkflowExecutionCoordinator.scala  |  5 +-
 .../user/workflow/WorkflowExecutionsResource.scala |  9 ++-
 .../web/service/ExecutionResultService.scala       |  5 +-
 .../web/service/WorkflowExecutionService.scala     |  6 +-
 .../scheduling/FingerprintUtilSpec.scala           |  5 +-
 .../core/workflow/cache/FingerprintUtil.scala      | 11 +++-
 10 files changed, 103 insertions(+), 39 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
index a613c7cef9..254948a0e1 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala
@@ -23,8 +23,16 @@ import com.twitter.util.Future
 import org.apache.texera.amber.core.WorkflowRuntimeException
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.workflow.GlobalPortIdentity
-import 
org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer,
 FatalError, PortMaterialized}
-import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 PortCompletedRequest, QueryStatisticsRequest}
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerAsyncRPCHandlerInitializer,
+  FatalError,
+  PortMaterialized
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  AsyncRPCContext,
+  PortCompletedRequest,
+  QueryStatisticsRequest
+}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.amber.util.VirtualIdentityUtils
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index e324d2b696..4d22db2763 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -71,11 +71,11 @@ class CostBasedScheduleGenerator(
       actorId = actorId
     )
 
-  private val cachedOutputsByPort
-      : Map[GlobalPortIdentity, CachedOutput] = 
workflowContext.workflowSettings.cachedOutputs.map {
-    case (serializedId, cachedOutput) =>
-      GlobalPortIdentitySerde.deserializeFromString(serializedId) -> 
cachedOutput
-  }
+  private val cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput] =
+    workflowContext.workflowSettings.cachedOutputs.map {
+      case (serializedId, cachedOutput) =>
+        GlobalPortIdentitySerde.deserializeFromString(serializedId) -> 
cachedOutput
+    }
 
   private case class CostEstimatorMemoKey(
       physicalOpIds: Set[PhysicalOpIdentity],
@@ -333,8 +333,7 @@ class CostBasedScheduleGenerator(
     if (isAcyclic) {
       annotateRegionsWithCacheInfo(regionDAG, matEdges, opToRegionMap.toMap)
       Left(regionDAG)
-    }
-    else Right(regionGraph)
+    } else Right(regionGraph)
   }
 
   /**
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
index 02ff87d2c0..a37c389606 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala
@@ -73,11 +73,12 @@ class DefaultCostEstimator(
         0.0
       } else {
         val opCost = region.getOperators.size * DEFAULT_OPERATOR_COST
-        val writeCost = resourceConfig.portConfigs.values.collect { case _: 
OutputPortConfig =>
-          0.5
+        val writeCost = resourceConfig.portConfigs.values.collect {
+          case _: OutputPortConfig =>
+            0.5
         }.sum
         val readCost = resourceConfig.portConfigs.values.collect {
-          case _: InputPortConfig              => 0.5
+          case _: InputPortConfig             => 0.5
           case _: IntermediateInputPortConfig => 0.5
         }.sum
         opCost + writeCost + readCost
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 2b6c9e6d71..e0b6aca821 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -25,17 +25,45 @@ import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, 
PhysicalLink, PhysicalOp}
-import 
org.apache.texera.amber.engine.architecture.common.{AkkaActorRefMappingService, 
AkkaActorService, ExecutorDeployment}
-import 
org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution,
 RegionExecution, WorkflowExecution}
-import 
org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, 
ExecutionStateUpdate, ExecutionStatsUpdate, WorkerAssignmentUpdate}
+import org.apache.texera.amber.engine.architecture.common.{
+  AkkaActorRefMappingService,
+  AkkaActorService,
+  ExecutorDeployment
+}
+import org.apache.texera.amber.engine.architecture.controller.execution.{
+  OperatorExecution,
+  RegionExecution,
+  WorkflowExecution
+}
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerConfig,
+  ExecutionStateUpdate,
+  ExecutionStatsUpdate,
+  WorkerAssignmentUpdate
+}
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn, 
WorkflowAggregatedState}
-import 
org.apache.texera.amber.engine.architecture.scheduling.config.{InputPortConfig, 
OperatorConfig, OutputPortConfig, ResourceConfig}
+import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
+  EmptyReturn,
+  WorkflowAggregatedState
+}
+import org.apache.texera.amber.engine.architecture.scheduling.config.{
+  InputPortConfig,
+  OperatorConfig,
+  OutputPortConfig,
+  ResourceConfig
+}
 import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning
-import 
org.apache.texera.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping,
 TupleMetrics, WorkerState}
+import org.apache.texera.amber.engine.architecture.worker.statistics.{
+  PortTupleMetricsMapping,
+  TupleMetrics,
+  WorkerState
+}
 import org.apache.texera.amber.engine.common.AmberLogging
 import org.apache.texera.amber.engine.common.FutureBijection._
-import 
org.apache.texera.amber.engine.common.executionruntimestate.{OperatorMetrics, 
OperatorStatistics}
+import org.apache.texera.amber.engine.common.executionruntimestate.{
+  OperatorMetrics,
+  OperatorStatistics
+}
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.web.SessionState
@@ -109,10 +137,12 @@ class RegionExecutionCoordinator(
       val opExecution = regionExecution.initOperatorExecution(op.id)
       // Cached regions do not create workers; synthesize operator-level 
metrics instead.
       val outputMetrics = op.outputPorts.keys.map { pid =>
-        val count = resourceConfig.portConfigs.collectFirst {
-          case (gpid, cfg: OutputPortConfig) if gpid == 
GlobalPortIdentity(op.id, pid) =>
-            cfg.cachedTupleCount.getOrElse(0L)
-        }.getOrElse(0L)
+        val count = resourceConfig.portConfigs
+          .collectFirst {
+            case (gpid, cfg: OutputPortConfig) if gpid == 
GlobalPortIdentity(op.id, pid) =>
+              cfg.cachedTupleCount.getOrElse(0L)
+          }
+          .getOrElse(0L)
         PortTupleMetricsMapping(pid, TupleMetrics(count, 0L))
       }.toSeq
       val inputMetrics = op.inputPorts.keys
@@ -122,7 +152,7 @@ class RegionExecutionCoordinator(
         WorkflowAggregatedState.COMPLETED,
         OperatorStatistics(
           inputMetrics,
-          outputMetrics,
+          outputMetrics
         )
       )
       opExecution.setCachedMetrics(stats)
@@ -135,13 +165,14 @@ class RegionExecutionCoordinator(
   }
 
   private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): 
Unit = {
-    resourceConfig.portConfigs.collect { case (gpid, cfg: OutputPortConfig) =>
-      val storageUri = cfg.storageURI
-      WorkflowExecutionsResource.insertOperatorPortResultUri(
-        eid = executionId,
-        globalPortId = gpid,
-        uri = storageUri
-      )
+    resourceConfig.portConfigs.collect {
+      case (gpid, cfg: OutputPortConfig) =>
+        val storageUri = cfg.storageURI
+        WorkflowExecutionsResource.insertOperatorPortResultUri(
+          eid = executionId,
+          globalPortId = gpid,
+          uri = storageUri
+        )
     }
   }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 893955f5a6..1b6074e51e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -26,7 +26,10 @@ import org.apache.texera.amber.engine.architecture.common.{
   AkkaActorRefMappingService,
   AkkaActorService
 }
-import 
org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, 
ExecutionStateUpdate}
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerConfig,
+  ExecutionStateUpdate
+}
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 
diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index 262d139ff7..470318d4f6 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -499,7 +499,11 @@ object WorkflowExecutionsResource {
     context
       .insertInto(OPERATOR_PORT_CACHE)
       .set(record)
-      .onConflict(OPERATOR_PORT_CACHE.WORKFLOW_ID, 
OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, OPERATOR_PORT_CACHE.SUBDAG_HASH)
+      .onConflict(
+        OPERATOR_PORT_CACHE.WORKFLOW_ID,
+        OPERATOR_PORT_CACHE.GLOBAL_PORT_ID,
+        OPERATOR_PORT_CACHE.SUBDAG_HASH
+      )
       .doUpdate()
       .set(OPERATOR_PORT_CACHE.RESULT_URI, record.getResultUri)
       .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, record.getFingerprintJson)
@@ -585,7 +589,8 @@ object WorkflowExecutionsResource {
         val tupleCount = 
Option(record.get(OPERATOR_PORT_CACHE.TUPLE_COUNT)).map(_.longValue())
         val fp = record.get(OPERATOR_PORT_CACHE.FINGERPRINT_JSON)
         val sourceEid =
-          Option(record.get(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID)).map(id 
=> ExecutionIdentity(id.longValue()))
+          Option(record.get(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID))
+            .map(id => ExecutionIdentity(id.longValue()))
         (uri, tupleCount, fp, sourceEid)
       }
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
index 5526cad551..b5c986e32b 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
@@ -334,7 +334,10 @@ class ExecutionResultService(
             evt.state == COMPLETED || evt.state == FAILED || evt.state == 
KILLED || evt.state == TERMINATED
           ) {
             logger.info("Workflow execution terminated. Stop update results.")
-            if (resultUpdateCancellable == null|| 
resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) {
+            if (
+              resultUpdateCancellable == null || resultUpdateCancellable
+                .cancel() || resultUpdateCancellable.isCancelled
+            ) {
               // immediately perform final update
               onResultUpdate(executionId, physicalPlan)
             }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index 38eef83ad6..7777aaf7ab 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -30,7 +30,11 @@ import 
org.apache.texera.amber.engine.common.client.AmberClient
 import 
org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore
 import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity}
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
-import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, 
WorkflowErrorEvent, WorkflowStateEvent}
+import org.apache.texera.web.model.websocket.event.{
+  TexeraWebSocketEvent,
+  WorkflowErrorEvent,
+  WorkflowStateEvent
+}
 import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 import org.apache.texera.web.storage.ExecutionStateStore
diff --git 
a/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala
 
b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala
index b85c06801e..af776df2bd 100644
--- 
a/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala
+++ 
b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala
@@ -35,7 +35,10 @@ class FingerprintUtilSpec extends AnyFlatSpec with Matchers {
   private def newCsv(): CSVScanSourceOpDesc =
     TestOperators.headerlessSmallCsvScanOpDesc()
 
-  private def newKeyword(pattern: String = "Asia", logicalId: Option[String] = 
None): KeywordSearchOpDesc = {
+  private def newKeyword(
+      pattern: String = "Asia",
+      logicalId: Option[String] = None
+  ): KeywordSearchOpDesc = {
     val op = TestOperators.keywordSearchOpDesc("column-1", pattern)
     logicalId.foreach(id => op.setOperatorId(id))
     op
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala
index c4c7b2c5d6..b27e09c276 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala
@@ -23,7 +23,12 @@ import com.fasterxml.jackson.annotation.JsonInclude
 import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
 import com.fasterxml.jackson.databind.node.ObjectNode
 import org.apache.texera.amber.core.executor.OpExecInitInfo
-import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, 
PhysicalLink, PhysicalOp, PhysicalPlan}
+import org.apache.texera.amber.core.workflow.{
+  GlobalPortIdentity,
+  PhysicalLink,
+  PhysicalOp,
+  PhysicalPlan
+}
 
 import java.nio.charset.StandardCharsets
 import java.security.MessageDigest
@@ -112,7 +117,9 @@ object FingerprintUtil {
 
     val edgeArray = objectMapper.createArrayNode()
     links.toList
-      .sortBy(link => (link.fromOpId.toString, link.fromPortId.id, 
link.toOpId.toString, link.toPortId.id))
+      .sortBy(link =>
+        (link.fromOpId.toString, link.fromPortId.id, link.toOpId.toString, 
link.toPortId.id)
+      )
       .foreach(link => edgeArray.add(buildEdge(link)))
     root.set("edges", edgeArray)
 

Reply via email to