This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b763e442d refactor(amber): rename remaining Akka* identifiers to 
Pekko* (#4949)
2b763e442d is described below

commit 2b763e442d3b9cd787bfd46f238c3b0bbccf0afc
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 11 21:32:08 2026 -0700

    refactor(amber): rename remaining Akka* identifiers to Pekko* (#4949)
    
    ### What changes were proposed in this PR?
    
    The project moved off Akka onto Apache Pekko, but several internal Scala
    identifiers still carried the `Akka` prefix even though they wrap Pekko
    APIs. Pure rename across `amber` and `common/config`:
    
    - `AkkaConfig` → `PekkoConfig` (object + file)
    - `AkkaActorService` → `PekkoActorService` (class + file)
    - `AkkaActorRefMappingService` → `PekkoActorRefMappingService`
    - `AkkaMessageTransferService` → `PekkoMessageTransferService`
    - `akkaConfig`, `akkaActorService` method/parameter names →
    pekko-prefixed
    
    No behavior change. No string literals, config keys, or serialization
    registrations are touched — `cluster.conf` already uses `pekko.*` keys,
    the kryo registry doesn't reference these classes by name. The
    intentional `"akka"` literal in `DeployStrategiesSpec.scala` that
    contrasts pekko vs akka address strings stays.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4948.
    
    ### How was this PR tested?
    
    `sbt WorkflowExecutionService/Test/compile` clean, `sbt
    WorkflowExecutionService/scalafmtCheckAll` clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7 (Claude Code)
---
 .../architecture/common/ExecutorDeployment.scala     |  2 +-
 ...rvice.scala => PekkoActorRefMappingService.scala} |  2 +-
 ...kkaActorService.scala => PekkoActorService.scala} |  2 +-
 ...rvice.scala => PekkoMessageTransferService.scala} |  6 +++---
 .../engine/architecture/common/WorkflowActor.scala   | 10 +++++-----
 .../controller/ControllerProcessor.scala             | 20 ++++++++++----------
 .../controller/ControllerTimerService.scala          |  6 +++---
 .../messaginglayer/WorkerTimerService.scala          |  4 ++--
 .../scheduling/RegionExecutionCoordinator.scala      | 10 +++++-----
 .../scheduling/WorkflowExecutionCoordinator.scala    | 10 +++++-----
 .../texera/amber/engine/common/AmberRuntime.scala    | 10 +++++-----
 .../engine/e2e/ReconfigurationIntegrationSpec.scala  |  2 +-
 .../logreplay/LogreplayPrimitivesSpec.scala          |  2 +-
 .../messaginglayer/CongestionControlSpec.scala       |  2 +-
 .../scheduling/RegionCoordinatorTestSupport.scala    | 10 +++++-----
 .../scheduling/RegionExecutionCoordinatorSpec.scala  |  6 +++---
 .../WorkflowExecutionCoordinatorSpec.scala           |  2 +-
 .../engine/architecture/worker/WorkerSpec.scala      |  2 +-
 .../engine/common/CheckpointSubsystemSpec.scala      |  2 +-
 .../texera/amber/engine/e2e/DataProcessingSpec.scala |  2 +-
 .../apache/texera/amber/engine/e2e/PauseSpec.scala   |  2 +-
 .../amber/engine/e2e/ReconfigurationSpec.scala       |  2 +-
 .../amber/engine/faulttolerance/CheckpointSpec.scala |  2 +-
 .../amber/engine/faulttolerance/LoggingSpec.scala    |  2 +-
 .../config/{AkkaConfig.scala => PekkoConfig.scala}   |  4 ++--
 25 files changed, 62 insertions(+), 62 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
index cf41297c98..fbb5b99ce6 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
@@ -38,7 +38,7 @@ object ExecutorDeployment {
 
   def createWorkers(
       op: PhysicalOp,
-      controllerActorService: AkkaActorService,
+      controllerActorService: PekkoActorService,
       operatorExecution: OperatorExecution,
       operatorConfig: OperatorConfig,
       stateRestoreConfig: Option[StateRestoreConfig],
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala
similarity index 98%
rename from 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala
rename to 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala
index 6cad314703..323435891e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala
@@ -33,7 +33,7 @@ import org.apache.texera.amber.util.VirtualIdentityUtils
 
 import scala.collection.mutable
 
-class AkkaActorRefMappingService(actorService: AkkaActorService) extends 
AmberLogging {
+class PekkoActorRefMappingService(actorService: PekkoActorService) extends 
AmberLogging {
 
   override def actorId: ActorVirtualIdentity = actorService.id
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala
similarity index 96%
rename from 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala
rename to 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala
index 10a6d7a38c..f5bbd0619c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala
@@ -28,7 +28,7 @@ import org.apache.texera.amber.engine.common.FutureBijection._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.{DurationInt, FiniteDuration}
 
-class AkkaActorService(val id: ActorVirtualIdentity, actorContext: 
ActorContext) {
+class PekkoActorService(val id: ActorVirtualIdentity, actorContext: 
ActorContext) {
 
   implicit def ec: ExecutionContext = actorContext.dispatcher
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala
similarity index 98%
rename from 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala
rename to 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala
index 3401e3ff63..cba9b0b2ee 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala
@@ -30,9 +30,9 @@ import 
org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
 import scala.collection.mutable
 import scala.concurrent.duration.DurationInt
 
-class AkkaMessageTransferService(
-    actorService: AkkaActorService,
-    refService: AkkaActorRefMappingService,
+class PekkoMessageTransferService(
+    actorService: PekkoActorService,
+    refService: PekkoActorRefMappingService,
     handleBackpressure: Boolean => Unit
 ) extends AmberLogging {
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
index 5ce64a0a3e..0f744dca26 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala
@@ -83,9 +83,9 @@ abstract class WorkflowActor(
     with AmberLogging {
 
   //
-  // Akka related components:
+  // Pekko related components:
   //
-  val actorService: AkkaActorService = new AkkaActorService(actorId, 
this.context)
+  val actorService: PekkoActorService = new PekkoActorService(actorId, 
this.context)
   actorService.getAvailableNodeAddressesFunc = () => {
     implicit val timeout: Timeout = 5.seconds
     Await
@@ -95,12 +95,12 @@ abstract class WorkflowActor(
       )
       .asInstanceOf[Array[Address]]
   }
-  val actorRefMappingService: AkkaActorRefMappingService = new 
AkkaActorRefMappingService(
+  val actorRefMappingService: PekkoActorRefMappingService = new 
PekkoActorRefMappingService(
     actorService
   )
   actorRefMappingService.registerActorRef(actorId, self)
-  val transferService: AkkaMessageTransferService =
-    new AkkaMessageTransferService(actorService, actorRefMappingService, 
handleBackpressure)
+  val transferService: PekkoMessageTransferService =
+    new PekkoMessageTransferService(actorService, actorRefMappingService, 
handleBackpressure)
 
   logger.info(s"worker replay log writing conf: $replayLogConfOpt")
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index 3ff8e7d978..ef33174b6b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -22,9 +22,9 @@ package org.apache.texera.amber.engine.architecture.controller
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.core.workflow.WorkflowContext
 import org.apache.texera.amber.engine.architecture.common.{
-  AkkaActorRefMappingService,
-  AkkaActorService,
-  AkkaMessageTransferService,
+  PekkoActorRefMappingService,
+  PekkoActorService,
+  PekkoMessageTransferService,
   AmberProcessor
 }
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
@@ -57,21 +57,21 @@ class ControllerProcessor(
     this.controllerTimerService = controllerTimerService
   }
 
-  @transient var transferService: AkkaMessageTransferService = _
+  @transient var transferService: PekkoMessageTransferService = _
 
-  def setupTransferService(transferService: AkkaMessageTransferService): Unit 
= {
+  def setupTransferService(transferService: PekkoMessageTransferService): Unit 
= {
     this.transferService = transferService
   }
 
-  @transient var actorService: AkkaActorService = _
+  @transient var actorService: PekkoActorService = _
 
-  def setupActorService(akkaActorService: AkkaActorService): Unit = {
-    this.actorService = akkaActorService
+  def setupActorService(pekkoActorService: PekkoActorService): Unit = {
+    this.actorService = pekkoActorService
   }
 
-  @transient var actorRefService: AkkaActorRefMappingService = _
+  @transient var actorRefService: PekkoActorRefMappingService = _
 
-  def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit 
= {
+  def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit 
= {
     this.actorRefService = actorRefService
     
this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService)
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
index a778a27c46..a4ad0898c9 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala
@@ -20,7 +20,7 @@
 package org.apache.texera.amber.engine.architecture.controller
 
 import org.apache.pekko.actor.Cancellable
-import org.apache.texera.amber.engine.architecture.common.AkkaActorService
+import org.apache.texera.amber.engine.architecture.common.PekkoActorService
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
   QueryStatisticsRequest,
@@ -34,7 +34,7 @@ import scala.concurrent.duration.{DurationInt, 
FiniteDuration, MILLISECONDS}
 
 class ControllerTimerService(
     controllerConfig: ControllerConfig,
-    akkaActorService: AkkaActorService
+    pekkoActorService: PekkoActorService
 ) {
   var statusUpdateAskHandle: Option[Cancellable] = None
   var runtimeStatisticsAskHandle: Option[Cancellable] = None
@@ -46,7 +46,7 @@ class ControllerTimerService(
   ): Option[Cancellable] = {
     if (intervalMs.nonEmpty && handleOpt.isEmpty) {
       Option(
-        akkaActorService.sendToSelfWithFixedDelay(
+        pekkoActorService.sendToSelfWithFixedDelay(
           0.milliseconds,
           FiniteDuration.apply(intervalMs.get, MILLISECONDS),
           ControlInvocation(
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
index 3bb87febd9..006c9614fd 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
@@ -21,7 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.messaginglayer
 
 import org.apache.pekko.actor.Cancellable
 import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.engine.architecture.common.AkkaActorService
+import org.apache.texera.amber.engine.architecture.common.PekkoActorService
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
   EmptyRequest
@@ -33,7 +33,7 @@ import 
org.apache.texera.amber.engine.common.virtualidentity.util.SELF
 
 import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
 
-class WorkerTimerService(actorService: AkkaActorService) {
+class WorkerTimerService(actorService: PekkoActorService) {
 
   private val enabledAdaptiveBatching = 
ApplicationConfig.enableAdaptiveNetworkBuffering
   private val adaptiveBatchInterval = 
ApplicationConfig.adaptiveBufferingTimeoutMs
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 254c16bf34..2971e4c4f4 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -26,8 +26,8 @@ import 
org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, 
PhysicalLink, PhysicalOp}
 import org.apache.texera.amber.engine.architecture.common.{
-  AkkaActorRefMappingService,
-  AkkaActorService,
+  PekkoActorRefMappingService,
+  PekkoActorService,
   ExecutorDeployment
 }
 import org.apache.texera.amber.engine.architecture.controller.execution.{
@@ -95,8 +95,8 @@ class RegionExecutionCoordinator(
     workflowExecution: WorkflowExecution,
     asyncRPCClient: AsyncRPCClient,
     controllerConfig: ControllerConfig,
-    actorService: AkkaActorService,
-    actorRefService: AkkaActorRefMappingService
+    actorService: PekkoActorService,
+    actorRefService: PekkoActorRefMappingService
 ) extends AmberLogging {
 
   initRegionExecution()
@@ -374,7 +374,7 @@ class RegionExecutionCoordinator(
   }
 
   private def buildOperator(
-      actorService: AkkaActorService,
+      actorService: PekkoActorService,
       physicalOp: PhysicalOp,
       operatorConfig: OperatorConfig,
       operatorExecution: OperatorExecution
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index df504bf92d..deb753beb3 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -23,8 +23,8 @@ import com.twitter.util.Future
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink}
 import org.apache.texera.amber.engine.architecture.common.{
-  AkkaActorRefMappingService,
-  AkkaActorService
+  PekkoActorRefMappingService,
+  PekkoActorService
 }
 import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
 import 
org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate
@@ -49,9 +49,9 @@ class WorkflowExecutionCoordinator(
     mutable.HashMap()
   private val completionNotified: AtomicBoolean = new AtomicBoolean(false)
 
-  @transient var actorRefService: AkkaActorRefMappingService = _
+  @transient var actorRefService: PekkoActorRefMappingService = _
 
-  def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit 
= {
+  def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit 
= {
     this.actorRefService = actorRefService
   }
 
@@ -62,7 +62,7 @@ class WorkflowExecutionCoordinator(
     *
     * After the syncs, if there are no running region(s), it will start new 
regions (if available).
     */
-  def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] 
= {
+  def coordinateRegionExecutors(actorService: PekkoActorService): Future[Unit] 
= {
     val unfinishedRegionCoordinators =
       regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala
index 7078f766a6..03234a277e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/AmberRuntime.scala
@@ -23,7 +23,7 @@ import org.apache.pekko.actor.{ActorSystem, Address, 
Cancellable, DeadLetter, Pr
 import org.apache.pekko.serialization.{Serialization, SerializationExtension}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.texera.amber.clustering.ClusterListener
-import org.apache.texera.amber.config.AkkaConfig
+import org.apache.texera.amber.config.PekkoConfig
 import 
org.apache.texera.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor
 
 import java.io.{BufferedReader, InputStreamReader}
@@ -39,7 +39,7 @@ object AmberRuntime {
   def serde: Serialization = {
     if (_serde == null) {
       if (_actorSystem == null) {
-        _serde = SerializationExtension(ActorSystem("Amber", akkaConfig))
+        _serde = SerializationExtension(ActorSystem("Amber", pekkoConfig))
       } else {
         _serde = SerializationExtension(_actorSystem)
       }
@@ -83,13 +83,13 @@ object AmberRuntime {
         pekko.remote.artery.canonical.hostname = $localIpAddress
         pekko.cluster.seed-nodes = [ "pekko://Amber@$localIpAddress:2552" ]
         """)
-      .withFallback(akkaConfig)
+      .withFallback(pekkoConfig)
       .resolve()
     AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
     createAmberSystem(masterConfig)
   }
 
-  def akkaConfig: Config = AkkaConfig.akkaConfig
+  def pekkoConfig: Config = PekkoConfig.pekkoConfig
 
   private def createMasterAddress(addr: String): Address = Address("pekko", 
"Amber", addr, 2552)
 
@@ -105,7 +105,7 @@ object AmberRuntime {
         pekko.remote.artery.canonical.port = 0
         pekko.cluster.seed-nodes = [ "pekko://Amber@$addr:2552" ]
         """)
-      .withFallback(akkaConfig)
+      .withFallback(pekkoConfig)
       .resolve()
     AmberConfig.masterNodeAddr = createMasterAddress(addr)
     createAmberSystem(workerConfig)
diff --git 
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
index acadb9154c..6f0936da28 100644
--- 
a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
+++ 
b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala
@@ -62,7 +62,7 @@ import scala.concurrent.duration._
   */
 @IntegrationTest
 class ReconfigurationIntegrationSpec
-    extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", 
AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("ReconfigurationIntegrationSpec", 
AmberRuntime.pekkoConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
index ddb6440c25..6ec33a6988 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
@@ -62,7 +62,7 @@ class LogreplayPrimitivesSpec extends AnyFlatSpec with 
BeforeAndAfterAll {
   // so no Pekko threads outlive the suite. (Same pattern as
   // CheckpointSubsystemSpec.)
   private val testSystem: ActorSystem =
-    ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.akkaConfig)
+    ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.pekkoConfig)
   private val testSerde: Serialization = SerializationExtension(testSystem)
 
   private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
index 322f96924d..30fa8ec8b4 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
@@ -214,7 +214,7 @@ class CongestionControlSpec extends AnyFlatSpec {
   }
 
   it should "return only the messages whose sentTime is older than 
resendTimeLimit" in {
-    // Cover the AkkaMessageTransferService.checkResend() retransmission path:
+    // Cover the PekkoMessageTransferService.checkResend() retransmission path:
     // the in-transit message that has been sitting past the 60s
     // resendTimeLimit must surface; the freshly-sent one must not.
     val cc = new CongestionControl()
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
index facba10241..5673c02691 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala
@@ -35,8 +35,8 @@ import org.apache.texera.amber.core.workflow.WorkflowContext.{
   DEFAULT_WORKFLOW_ID
 }
 import org.apache.texera.amber.engine.architecture.common.{
-  AkkaActorRefMappingService,
-  AkkaActorService,
+  PekkoActorRefMappingService,
+  PekkoActorService,
   WorkflowActor
 }
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
@@ -78,8 +78,8 @@ object RegionCoordinatorTestSupport {
   )
 
   case class ControllerHarnessFixture(
-      actorService: AkkaActorService,
-      actorRefService: AkkaActorRefMappingService
+      actorService: PekkoActorService,
+      actorRefService: PekkoActorRefMappingService
   )
 
   /**
@@ -231,7 +231,7 @@ trait RegionCoordinatorTestSupport { self: TestKit =>
   }
 
   protected def registerLiveWorker(
-      actorRefService: AkkaActorRefMappingService,
+      actorRefService: PekkoActorRefMappingService,
       workerId: ActorVirtualIdentity
   ): ActorRef = {
     val workerRef = system.actorOf(Props(new IdleActor), 
s"worker-${System.nanoTime()}")
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
index 8fab3b67fc..6efbe5e4ca 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
@@ -24,7 +24,7 @@ import org.apache.pekko.actor.ActorSystem
 import org.apache.pekko.testkit.TestKit
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import org.apache.texera.amber.core.workflow.PhysicalOp
-import 
org.apache.texera.amber.engine.architecture.common.AkkaActorRefMappingService
+import 
org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingService
 import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.texera.amber.engine.architecture.rpc.controlreturns._
@@ -51,7 +51,7 @@ import java.util.concurrent.atomic
   *    workers terminated, and allow the next region to start.
   */
 class RegionExecutionCoordinatorSpec
-    extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec", 
AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec", 
AmberRuntime.pekkoConfig))
     with AnyFlatSpecLike
     with BeforeAndAfterAll
     with RegionCoordinatorTestSupport {
@@ -117,7 +117,7 @@ class RegionExecutionCoordinatorSpec
       region: Region,
       physicalOp: PhysicalOp,
       workerId: ActorVirtualIdentity,
-      actorRefService: AkkaActorRefMappingService
+      actorRefService: PekkoActorRefMappingService
   )
 
   private def createSingleRegionFixture(
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index f5fc17f8e0..1a1e4afa31 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -38,7 +38,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.flatspec.AnyFlatSpecLike
 
 class WorkflowExecutionCoordinatorSpec
-    extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", 
AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", 
AmberRuntime.pekkoConfig))
     with AnyFlatSpecLike
     with BeforeAndAfterAll
     with RegionCoordinatorTestSupport {
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
index 890fe97b85..f6f33ebbb0 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala
@@ -64,7 +64,7 @@ class DummyOperatorExecutor extends OperatorExecutor {
 }
 
 class WorkerSpec
-    extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("WorkerSpec", AmberRuntime.pekkoConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
index 45b1727afc..6b46030b6b 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/CheckpointSubsystemSpec.scala
@@ -35,7 +35,7 @@ class CheckpointSubsystemSpec extends AnyFlatSpec with 
BeforeAndAfterAll {
   // and AmberRuntime's reference are torn down in afterAll, so no Pekko
   // threads outlive the test (matching ControllerSpec/WorkerSpec hygiene).
   private val testSystem: ActorSystem =
-    ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.akkaConfig)
+    ActorSystem("CheckpointSubsystemSpec-test", AmberRuntime.pekkoConfig)
   private val testSerde: Serialization = SerializationExtension(testSystem)
 
   private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index f93909f53f..d070fefb27 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -55,7 +55,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, 
Outcome, Retries}
 import scala.concurrent.duration.DurationInt
 
 class DataProcessingSpec
-    extends TestKit(ActorSystem("DataProcessingSpec", AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("DataProcessingSpec", 
AmberRuntime.pekkoConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
index b459533c57..2cc268608f 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/PauseSpec.scala
@@ -51,7 +51,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, 
Outcome, Retries}
 import scala.concurrent.duration._
 
 class PauseSpec
-    extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("PauseSpec", AmberRuntime.pekkoConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 8cabf8684a..2cd3559736 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -42,7 +42,7 @@ import org.scalatest.flatspec.AnyFlatSpecLike
 import scala.concurrent.duration._
 
 class ReconfigurationSpec
-    extends TestKit(ActorSystem("ReconfigurationSpec", 
AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("ReconfigurationSpec", 
AmberRuntime.pekkoConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
index 73d4601a92..fbc7e8044d 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala
@@ -59,7 +59,7 @@ class CheckpointSpec extends AnyFlatSpecLike with 
BeforeAndAfterAll {
   )
 
   override def beforeAll(): Unit = {
-    system = ActorSystem("CheckpointSpec", AmberRuntime.akkaConfig)
+    system = ActorSystem("CheckpointSpec", AmberRuntime.pekkoConfig)
     system.actorOf(Props[SingleNodeListener](), "cluster-info")
   }
 
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
index 87e3ca148e..9a38811915 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/LoggingSpec.scala
@@ -61,7 +61,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 import java.net.URI
 
 class LoggingSpec
-    extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.akkaConfig))
+    extends TestKit(ActorSystem("LoggingSpec", AmberRuntime.pekkoConfig))
     with ImplicitSender
     with AnyFlatSpecLike
     with BeforeAndAfterAll
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala 
b/common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala
similarity index 90%
rename from 
common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala
rename to 
common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala
index 7f2097063a..33ba24d247 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/AkkaConfig.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/PekkoConfig.scala
@@ -20,11 +20,11 @@ package org.apache.texera.amber.config
 
 import com.typesafe.config.{Config, ConfigFactory}
 
-object AkkaConfig {
+object PekkoConfig {
 
   // Load configuration
   private val conf: Config = 
ConfigFactory.parseResources("cluster.conf").resolve()
 
   // Return the complete Pekko configuration with fallback to default 
application config
-  def akkaConfig: Config = 
conf.withFallback(ConfigFactory.defaultApplication()).resolve()
+  def pekkoConfig: Config = 
conf.withFallback(ConfigFactory.defaultApplication()).resolve()
 }

Reply via email to