This is an automated email from the ASF dual-hosted git repository. Xiao-zhen-Liu pushed a commit to branch xiaozhen-caching-prototype in repository https://gitbox.apache.org/repos/asf/texera.git
commit c08e919d71255d88e273d4edf9a893385c4cf102 Author: Xiaozhen Liu <[email protected]> AuthorDate: Mon Jan 12 20:37:54 2026 -0800 feat(cache): fmt. --- .../promisehandlers/PortCompletedHandler.scala | 12 +++- .../scheduling/CostBasedScheduleGenerator.scala | 13 ++-- .../architecture/scheduling/CostEstimator.scala | 7 ++- .../scheduling/RegionExecutionCoordinator.scala | 69 ++++++++++++++++------ .../scheduling/WorkflowExecutionCoordinator.scala | 5 +- .../user/workflow/WorkflowExecutionsResource.scala | 9 ++- .../web/service/ExecutionResultService.scala | 5 +- .../web/service/WorkflowExecutionService.scala | 6 +- .../scheduling/FingerprintUtilSpec.scala | 5 +- .../core/workflow/cache/FingerprintUtil.scala | 11 +++- 10 files changed, 103 insertions(+), 39 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala index a613c7cef9..254948a0e1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala @@ -23,8 +23,16 @@ import com.twitter.util.Future import org.apache.texera.amber.core.WorkflowRuntimeException import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.workflow.GlobalPortIdentity -import org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer, FatalError, PortMaterialized} -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, PortCompletedRequest, QueryStatisticsRequest} +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerAsyncRPCHandlerInitializer, + FatalError, + PortMaterialized +} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + PortCompletedRequest, + QueryStatisticsRequest +} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.amber.util.VirtualIdentityUtils diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index e324d2b696..4d22db2763 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -71,11 +71,11 @@ class CostBasedScheduleGenerator( actorId = actorId ) - private val cachedOutputsByPort - : Map[GlobalPortIdentity, CachedOutput] = workflowContext.workflowSettings.cachedOutputs.map { - case (serializedId, cachedOutput) => - GlobalPortIdentitySerde.deserializeFromString(serializedId) -> cachedOutput - } + private val cachedOutputsByPort: Map[GlobalPortIdentity, CachedOutput] = + workflowContext.workflowSettings.cachedOutputs.map { + case (serializedId, cachedOutput) => + GlobalPortIdentitySerde.deserializeFromString(serializedId) -> cachedOutput + } private case class CostEstimatorMemoKey( physicalOpIds: Set[PhysicalOpIdentity], @@ -333,8 +333,7 @@ class CostBasedScheduleGenerator( if (isAcyclic) { annotateRegionsWithCacheInfo(regionDAG, matEdges, opToRegionMap.toMap) Left(regionDAG) - } - else Right(regionGraph) + } else Right(regionGraph) } /** diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala index 02ff87d2c0..a37c389606 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostEstimator.scala @@ -73,11 +73,12 @@ class DefaultCostEstimator( 0.0 } else { val opCost = region.getOperators.size * DEFAULT_OPERATOR_COST - val writeCost = resourceConfig.portConfigs.values.collect { case _: OutputPortConfig => - 0.5 + val writeCost = resourceConfig.portConfigs.values.collect { + case _: OutputPortConfig => + 0.5 }.sum val readCost = resourceConfig.portConfigs.values.collect { - case _: InputPortConfig => 0.5 + case _: InputPortConfig => 0.5 case _: IntermediateInputPortConfig => 0.5 }.sum opCost + writeCost + readCost diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 2b6c9e6d71..e0b6aca821 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -25,17 +25,45 @@ import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp} -import org.apache.texera.amber.engine.architecture.common.{AkkaActorRefMappingService, AkkaActorService, ExecutorDeployment} -import org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution, RegionExecution, WorkflowExecution} -import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate, ExecutionStatsUpdate, WorkerAssignmentUpdate} +import org.apache.texera.amber.engine.architecture.common.{ + AkkaActorRefMappingService, + AkkaActorService, + ExecutorDeployment +} +import org.apache.texera.amber.engine.architecture.controller.execution.{ + OperatorExecution, + RegionExecution, + WorkflowExecution +} +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerConfig, + ExecutionStateUpdate, + ExecutionStatsUpdate, + WorkerAssignmentUpdate +} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ -import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn, WorkflowAggregatedState} -import org.apache.texera.amber.engine.architecture.scheduling.config.{InputPortConfig, OperatorConfig, OutputPortConfig, ResourceConfig} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ + EmptyReturn, + WorkflowAggregatedState +} +import org.apache.texera.amber.engine.architecture.scheduling.config.{ + InputPortConfig, + OperatorConfig, + OutputPortConfig, + ResourceConfig +} import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.texera.amber.engine.architecture.worker.statistics.{PortTupleMetricsMapping, TupleMetrics, WorkerState} +import org.apache.texera.amber.engine.architecture.worker.statistics.{ + PortTupleMetricsMapping, + TupleMetrics, + WorkerState +} import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.engine.common.FutureBijection._ -import org.apache.texera.amber.engine.common.executionruntimestate.{OperatorMetrics, OperatorStatistics} +import org.apache.texera.amber.engine.common.executionruntimestate.{ + OperatorMetrics, + OperatorStatistics +} import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.web.SessionState @@ -109,10 +137,12 @@ class RegionExecutionCoordinator( val opExecution = regionExecution.initOperatorExecution(op.id) // Cached regions do not create workers; synthesize operator-level metrics instead. val outputMetrics = op.outputPorts.keys.map { pid => - val count = resourceConfig.portConfigs.collectFirst { - case (gpid, cfg: OutputPortConfig) if gpid == GlobalPortIdentity(op.id, pid) => - cfg.cachedTupleCount.getOrElse(0L) - }.getOrElse(0L) + val count = resourceConfig.portConfigs + .collectFirst { + case (gpid, cfg: OutputPortConfig) if gpid == GlobalPortIdentity(op.id, pid) => + cfg.cachedTupleCount.getOrElse(0L) + } + .getOrElse(0L) PortTupleMetricsMapping(pid, TupleMetrics(count, 0L)) }.toSeq val inputMetrics = op.inputPorts.keys @@ -122,7 +152,7 @@ class RegionExecutionCoordinator( WorkflowAggregatedState.COMPLETED, OperatorStatistics( inputMetrics, - outputMetrics, + outputMetrics ) ) opExecution.setCachedMetrics(stats) @@ -135,13 +165,14 @@ class RegionExecutionCoordinator( } private def recordCachedOutputPortResults(resourceConfig: ResourceConfig): Unit = { - resourceConfig.portConfigs.collect { case (gpid, cfg: OutputPortConfig) => - val storageUri = cfg.storageURI - WorkflowExecutionsResource.insertOperatorPortResultUri( - eid = executionId, - globalPortId = gpid, - uri = storageUri - ) + resourceConfig.portConfigs.collect { + case (gpid, cfg: OutputPortConfig) => + val storageUri = cfg.storageURI + WorkflowExecutionsResource.insertOperatorPortResultUri( + eid = executionId, + globalPortId = gpid, + uri = storageUri + ) } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 893955f5a6..1b6074e51e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -26,7 +26,10 @@ import org.apache.texera.amber.engine.architecture.common.{ AkkaActorRefMappingService, AkkaActorService } -import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate} +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerConfig, + ExecutionStateUpdate +} import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 262d139ff7..470318d4f6 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -499,7 +499,11 @@ object WorkflowExecutionsResource { context .insertInto(OPERATOR_PORT_CACHE) .set(record) - .onConflict(OPERATOR_PORT_CACHE.WORKFLOW_ID, OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, OPERATOR_PORT_CACHE.SUBDAG_HASH) + .onConflict( + OPERATOR_PORT_CACHE.WORKFLOW_ID, + OPERATOR_PORT_CACHE.GLOBAL_PORT_ID, + OPERATOR_PORT_CACHE.SUBDAG_HASH + ) .doUpdate() .set(OPERATOR_PORT_CACHE.RESULT_URI, record.getResultUri) .set(OPERATOR_PORT_CACHE.FINGERPRINT_JSON, record.getFingerprintJson) @@ -585,7 +589,8 @@ object WorkflowExecutionsResource { val tupleCount = Option(record.get(OPERATOR_PORT_CACHE.TUPLE_COUNT)).map(_.longValue()) val fp = record.get(OPERATOR_PORT_CACHE.FINGERPRINT_JSON) val sourceEid = - Option(record.get(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID)).map(id => ExecutionIdentity(id.longValue())) + Option(record.get(OPERATOR_PORT_CACHE.SOURCE_EXECUTION_ID)) + .map(id => ExecutionIdentity(id.longValue())) (uri, tupleCount, fp, sourceEid) } } diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index 5526cad551..b5c986e32b 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -334,7 +334,10 @@ class ExecutionResultService( evt.state == COMPLETED || evt.state == FAILED || evt.state == KILLED || evt.state == TERMINATED ) { logger.info("Workflow execution terminated. Stop update results.") - if (resultUpdateCancellable == null|| resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) { + if ( + resultUpdateCancellable == null || resultUpdateCancellable + .cancel() || resultUpdateCancellable.isCancelled + ) { // immediately perform final update onResultUpdate(executionId, physicalPlan) } diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index 38eef83ad6..7777aaf7ab 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -30,7 +30,11 @@ import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore import org.apache.texera.amber.core.workflow.{CachedOutput, GlobalPortIdentity} import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps -import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent, WorkflowErrorEvent, WorkflowStateEvent} +import org.apache.texera.web.model.websocket.event.{ + TexeraWebSocketEvent, + WorkflowErrorEvent, + WorkflowStateEvent +} import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import org.apache.texera.web.storage.ExecutionStateStore diff --git a/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala index b85c06801e..af776df2bd 100644 --- a/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala +++ b/amber/src/test/scala/org/apache/amber/engine/architecture/scheduling/FingerprintUtilSpec.scala @@ -35,7 +35,10 @@ class FingerprintUtilSpec extends AnyFlatSpec with Matchers { private def newCsv(): CSVScanSourceOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() - private def newKeyword(pattern: String = "Asia", logicalId: Option[String] = None): KeywordSearchOpDesc = { + private def newKeyword( + pattern: String = "Asia", + logicalId: Option[String] = None + ): KeywordSearchOpDesc = { val op = TestOperators.keywordSearchOpDesc("column-1", pattern) logicalId.foreach(id => op.setOperatorId(id)) op diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala index c4c7b2c5d6..b27e09c276 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/cache/FingerprintUtil.scala @@ -23,7 +23,12 @@ import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.texera.amber.core.executor.OpExecInitInfo -import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp, PhysicalPlan} +import org.apache.texera.amber.core.workflow.{ + GlobalPortIdentity, + PhysicalLink, + PhysicalOp, + PhysicalPlan +} import java.nio.charset.StandardCharsets import java.security.MessageDigest @@ -112,7 +117,9 @@ object FingerprintUtil { val edgeArray = objectMapper.createArrayNode() links.toList - .sortBy(link => (link.fromOpId.toString, link.fromPortId.id, link.toOpId.toString, link.toPortId.id)) + .sortBy(link => + (link.fromOpId.toString, link.fromPortId.id, link.toOpId.toString, link.toPortId.id) + ) .foreach(link => edgeArray.add(buildEdge(link))) root.set("edges", edgeArray)
