This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 8be96862f [CELEBORN-726] Update data replication terminology from 
`master/slave` to `primary/replica` for configurations and metrics
8be96862f is described below

commit 8be96862f7ee008bb475bd1b61e42021fdb6325a
Author: Fu Chen <[email protected]>
AuthorDate: Thu Jun 29 09:47:02 2023 +0800

    [CELEBORN-726] Update data replication terminology from `master/slave` to 
`primary/replica` for configurations and metrics
    
    ### What changes were proposed in this pull request?
    
    This pull PR is an integral component of #1639 . It primarily focuses on 
updating configuration settings and metrics terminology, while ensuring 
compatibility with older client versions by refraining from introducing changes 
related to RPC.
    
    ### Why are the changes needed?
    
    In order to distinguish it from the existing master/worker, refactor data 
replication terminology to 'primary/replica' for improved clarity and 
inclusivity in the codebase
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests.
    
    Closes #1650 from cfmcgrady/primary-replica-metrics.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 17c1e018746f903beac07334a6790b79898dca80)
    Signed-off-by: Cheng Pan <[email protected]>
---
 METRICS.md                                         |  4 +-
 assets/grafana/rss-dashboard.json                  | 66 +++++++++++-----------
 .../celeborn/client/WorkerStatusTracker.scala      |  8 +--
 .../org/apache/celeborn/common/CelebornConf.scala  | 24 ++++----
 docs/configuration/client.md                       |  2 +-
 docs/migration.md                                  | 14 +++++
 docs/monitoring.md                                 | 18 +++---
 .../celeborn/tests/spark/PushDataTimeoutTest.scala | 10 ++--
 .../service/deploy/worker/PushDataHandler.scala    | 36 ++++++------
 .../service/deploy/worker/WorkerSource.scala       | 32 +++++------
 10 files changed, 114 insertions(+), 100 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index 3cd32313d..1ddcb85ce 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -73,8 +73,8 @@ Here is an example of grafana dashboard importing.
 |             FlushDataTime              |      worker       |                 
                 FlushData means flush a disk buffer to disk.                   
                |
 |             OpenStreamTime             |      worker       |            
OpenStream means read a shuffle file and send client about chunks size and 
stream index.             |
 |             FetchChunkTime             |      worker       |                 
     FetchChunk means read a chunk from a shuffle file and send to client.      
                |
-|           MasterPushDataTime           |      worker       |                 
      MasterPushData means handle pushdata of master partition location.        
                |
-|           SlavePushDataTime            |      worker       |                 
       SlavePushData means handle pushdata of slave partition location.         
                |
+|           PrimaryPushDataTime          |      worker       |                 
      PrimaryPushData means handle pushdata of primary partition location.      
                  |
+|           ReplicaPushDataTime          |      worker       |                 
       ReplicaPushData means handle pushdata of replica partition location.     
                    |
 |           WriteDataFailCount           |      worker       |                 
   The count of writing PushData or PushMergedData failed in current worker.    
                |
 |         ReplicateDataFailCount         |      worker       |                 
 The count of replicating PushData or PushMergedData failed in current worker.  
                |
 |      ReplicateDataWriteFailCount       |      worker       |       The count 
of replicating PushData or PushMergedData failed caused by write failure in 
peer worker.        |
diff --git a/assets/grafana/rss-dashboard.json 
b/assets/grafana/rss-dashboard.json
index 2a424a984..726c97b03 100644
--- a/assets/grafana/rss-dashboard.json
+++ b/assets/grafana/rss-dashboard.json
@@ -1353,11 +1353,11 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_MasterPushDataTime_Mean",
+              "expr": "metrics_PrimaryPushDataTime_Mean",
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterPushDataTime_Mean",
+          "title": "metrics_PrimaryPushDataTime_Mean",
           "type": "timeseries"
         },
         {
@@ -1442,11 +1442,11 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_MasterPushDataTime_Max",
+              "expr": "metrics_PrimaryPushDataTime_Max",
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterPushDataTime_Max",
+          "title": "metrics_PrimaryPushDataTime_Max",
           "type": "timeseries"
         },
         {
@@ -1531,11 +1531,11 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_SlavePushDataTime_Mean",
+              "expr": "metrics_ReplicaPushDataTime_Mean",
               "refId": "A"
             }
           ],
-          "title": "metrics_SlavePushDataTime_Mean",
+          "title": "metrics_ReplicaPushDataTime_Mean",
           "type": "timeseries"
         },
         {
@@ -1620,11 +1620,11 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_SlavePushDataTime_Max",
+              "expr": "metrics_ReplicaPushDataTime_Max",
               "refId": "A"
             }
           ],
-          "title": "metrics_SlavePushDataTime_Max",
+          "title": "metrics_ReplicaPushDataTime_Max",
           "type": "timeseries"
         },
         {
@@ -5040,12 +5040,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_MasterPushDataHandshakeTime_Mean",
+              "expr": "metrics_PrimaryPushDataHandshakeTime_Mean",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterPushDataHandshakeTime_Mean",
+          "title": "metrics_PrimaryPushDataHandshakeTime_Mean",
           "type": "timeseries"
         },
         {
@@ -5131,12 +5131,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_MasterPushDataHandshakeTime_Max",
+              "expr": "metrics_PrimaryPushDataHandshakeTime_Max",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterPushDataHandshakeTime_Max",
+          "title": "metrics_PrimaryPushDataHandshakeTime_Max",
           "type": "timeseries"
         },
         {
@@ -5222,12 +5222,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_SlavePushDataHandshakeTime_Mean",
+              "expr": "metrics_ReplicaPushDataHandshakeTime_Mean",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_SlavePushDataHandshakeTime_Mean",
+          "title": "metrics_ReplicaPushDataHandshakeTime_Mean",
           "type": "timeseries"
         },
         {
@@ -5313,12 +5313,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_SlavePushDataHandshakeTime_Max",
+              "expr": "metrics_ReplicaPushDataHandshakeTime_Max",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_SlavePushDataHandshakeTime_Max",
+          "title": "metrics_ReplicaPushDataHandshakeTime_Max",
           "type": "timeseries"
         },
         {
@@ -5404,12 +5404,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_MasterRegionStartTime_Mean",
+              "expr": "metrics_PrimaryRegionStartTime_Mean",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterRegionStartTime_Mean",
+          "title": "metrics_PrimaryRegionStartTime_Mean",
           "type": "timeseries"
         },
         {
@@ -5495,12 +5495,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_MasterRegionStartTime_Max",
+              "expr": "metrics_PrimaryRegionStartTime_Max",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterRegionStartTime_Max",
+          "title": "metrics_PrimaryRegionStartTime_Max",
           "type": "timeseries"
         },
         {
@@ -5586,12 +5586,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_SlaveRegionStartTime_Mean",
+              "expr": "metrics_ReplicaRegionStartTime_Mean",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_SlaveRegionStartTime_Mean",
+          "title": "metrics_ReplicaRegionStartTime_Mean",
           "type": "timeseries"
         },
         {
@@ -5677,12 +5677,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_SlaveRegionStartTime_Max",
+              "expr": "metrics_ReplicaRegionStartTime_Max",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_SlaveRegionStartTime_Max",
+          "title": "metrics_ReplicaRegionStartTime_Max",
           "type": "timeseries"
         },
         {
@@ -5768,12 +5768,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_MasterRegionFinishTime_Mean",
+              "expr": "metrics_PrimaryRegionFinishTime_Mean",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterRegionFinishTime_Mean",
+          "title": "metrics_PrimaryRegionFinishTime_Mean",
           "type": "timeseries"
         },
         {
@@ -5859,12 +5859,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_MasterRegionFinishTime_Max",
+              "expr": "metrics_PrimaryRegionFinishTime_Max",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_MasterRegionFinishTime_Max",
+          "title": "metrics_PrimaryRegionFinishTime_Max",
           "type": "timeseries"
         },
         {
@@ -5950,12 +5950,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_SlaveRegionFinishTime_Mean",
+              "expr": "metrics_ReplicaRegionFinishTime_Mean",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_SlaveRegionFinishTime_Mean",
+          "title": "metrics_ReplicaRegionFinishTime_Mean",
           "type": "timeseries"
         },
         {
@@ -6041,12 +6041,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": "metrics_SlaveRegionFinishTime_Max",
+              "expr": "metrics_ReplicaRegionFinishTime_Max",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_SlaveRegionFinishTime_Max",
+          "title": "metrics_ReplicaRegionFinishTime_Max",
           "type": "timeseries"
         },
         {
@@ -7401,4 +7401,4 @@
   "uid": "U_qgru_7z",
   "version": 16,
   "weekStart": ""
-}
\ No newline at end of file
+}
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 1c71c2a45..2519bad3b 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -84,26 +84,26 @@ class WorkerStatusTracker(
         case StatusCode.PUSH_DATA_WRITE_FAIL_MASTER =>
           excludeWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_MASTER)
         case StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE
-            if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled 
=>
+            if oldPartition.hasPeer && 
conf.clientExcludeReplicaOnFailureEnabled =>
           excludeWorker(oldPartition.getPeer, 
StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE)
         case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
           excludeWorker(oldPartition, 
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
         case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
-            if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled 
=>
+            if oldPartition.hasPeer && 
conf.clientExcludeReplicaOnFailureEnabled =>
           excludeWorker(
             oldPartition.getPeer,
             StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
         case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
           excludeWorker(oldPartition, 
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
         case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
-            if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled 
=>
+            if oldPartition.hasPeer && 
conf.clientExcludeReplicaOnFailureEnabled =>
           excludeWorker(
             oldPartition.getPeer,
             StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
         case StatusCode.PUSH_DATA_TIMEOUT_MASTER =>
           excludeWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_MASTER)
         case StatusCode.PUSH_DATA_TIMEOUT_SLAVE
-            if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled 
=>
+            if oldPartition.hasPeer && 
conf.clientExcludeReplicaOnFailureEnabled =>
           excludeWorker(
             oldPartition.getPeer,
             StatusCode.PUSH_DATA_TIMEOUT_SLAVE)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 85717075f..6b0bef42c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -695,7 +695,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
   def clientCheckedUseAllocatedWorkers: Boolean = 
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
   def clientExcludedWorkerExpireTimeout: Long = 
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
-  def clientExcludeSlaveOnFailureEnabled: Boolean = 
get(CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED)
+  def clientExcludeReplicaOnFailureEnabled: Boolean = 
get(CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED)
 
   // //////////////////////////////////////////////////////
   //               Shuffle Compression                   //
@@ -985,8 +985,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   // //////////////////////////////////////////////////////
   def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
   def testRetryCommitFiles: Boolean = get(TEST_CLIENT_RETRY_COMMIT_FILE)
-  def testPushMasterDataTimeout: Boolean = 
get(TEST_CLIENT_PUSH_MASTER_DATA_TIMEOUT)
-  def testPushSlaveDataTimeout: Boolean = 
get(TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT)
+  def testPushPrimaryDataTimeout: Boolean = 
get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
+  def testPushReplicaDataTimeout: Boolean = 
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
   def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
   def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
   def clientFlinkMemoryPerResultPartitionMin: Long = 
get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
@@ -2575,12 +2575,12 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("10s")
 
-  val CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
-    buildConf("celeborn.client.excludeSlaveOnFailure.enabled")
+  val CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.excludeReplicaOnFailure.enabled")
       .categories("client")
       .version("0.3.0")
       .doc("When true, Celeborn will exclude partition's peer worker on 
failure " +
-        "when push data to slave failed.")
+        "when push data to replica failed.")
       .booleanConf
       .createWithDefault(true)
 
@@ -2742,22 +2742,22 @@ object CelebornConf extends Logging {
       .checkValue(_ > 0, "celeborn.client.push.data.timeout must be positive!")
       .createWithDefaultString("120s")
 
-  val TEST_CLIENT_PUSH_MASTER_DATA_TIMEOUT: ConfigEntry[Boolean] =
-    buildConf("celeborn.test.worker.pushMasterDataTimeout")
+  val TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT: ConfigEntry[Boolean] =
+    buildConf("celeborn.test.worker.pushPrimaryDataTimeout")
       .withAlternative("celeborn.test.pushMasterDataTimeout")
       .internal
       .categories("test", "worker")
       .version("0.3.0")
-      .doc("Whether to test push master data timeout")
+      .doc("Whether to test push primary data timeout")
       .booleanConf
       .createWithDefault(false)
 
-  val TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT: ConfigEntry[Boolean] =
-    buildConf("celeborn.test.worker.pushSlaveDataTimeout")
+  val TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT: ConfigEntry[Boolean] =
+    buildConf("celeborn.test.worker.pushReplicaDataTimeout")
       .internal
       .categories("test", "worker")
       .version("0.3.0")
-      .doc("Whether to test push slave data timeout")
+      .doc("Whether to test push replica data timeout")
       .booleanConf
       .createWithDefault(false)
 
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index f93ba9f97..bec86be1c 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -22,7 +22,7 @@ license: |
 | celeborn.client.application.heartbeatInterval | 10s | Interval for client to 
send heartbeat message to master. | 0.3.0 | 
 | celeborn.client.closeIdleConnections | true | Whether client will close idle 
connections. | 0.3.0 | 
 | celeborn.client.commitFiles.ignoreExcludedWorker | false | When true, 
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | 
-| celeborn.client.excludeSlaveOnFailure.enabled | true | When true, Celeborn 
will exclude partition's peer worker on failure when push data to slave failed. 
| 0.3.0 | 
+| celeborn.client.excludeReplicaOnFailure.enabled | true | When true, Celeborn 
will exclude partition's peer worker on failure when push data to replica 
failed. | 0.3.0 | 
 | celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for 
LifecycleManager to clear reserved excluded worker. Default to be 1.5 * 
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout 
check period | 0.3.0 | 
 | celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to 
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 | 
 | celeborn.client.fetch.excludedWorker.expireTimeout | &lt;value of 
celeborn.client.excludedWorker.expireTimeout&gt; | ShuffleClient is a static 
object, it will be used in the whole lifecycle of Executor,We give a expire 
time for excluded workers to avoid a transient worker issues. | 0.3.0 | 
diff --git a/docs/migration.md b/docs/migration.md
index af76683ee..81170aa53 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -47,3 +47,17 @@ license: |
  - Since 0.3.0, Celeborn master metrics `BlacklistedWorkerCount` is renamed as 
`ExcludedWorkerCount`.
 
  - Since 0.3.0, Celeborn master http request url `/blacklistedWorkers` is 
renamed as `/excludedWorkers`.
+
+ - Since 0.3.0, introduces a terminology update for Celeborn worker data 
replication, replacing the previous `master/slave` terminology with 
`primary/replica`. In alignment with this change, corresponding metrics 
keywords have been adjusted.
+   The following table presents a comprehensive overview of the changes:
+
+     | Key Before v0.3.0             | Key After v0.3.0               |
+     |-------------------------------|--------------------------------|
+     | `MasterPushDataTime`          | `PrimaryPushDataTime`          |
+     | `MasterPushDataHandshakeTime` | `PrimaryPushDataHandshakeTime` |
+     | `MasterRegionStartTime`       | `PrimaryRegionStartTime`       |
+     | `MasterRegionFinishTime`      | `PrimaryRegionFinishTime`      |
+     | `SlavePushDataTime`           | `ReplicaPushDataTime`          |
+     | `SlavePushDataHandshakeTime`  | `ReplicaPushDataHandshakeTime` |
+     | `SlaveRegionStartTime`        | `ReplicaRegionStartTime`       |
+     | `SlaveRegionFinishTime`       | `ReplicaRegionFinishTime`      |
diff --git a/docs/monitoring.md b/docs/monitoring.md
index efe93b4d5..92db7c922 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -134,9 +134,9 @@ These metrics are exposed by Celeborn worker.
       - The time for a worker to process openStream RPC and return 
StreamHandle.
     - FetchChunkTime
       - The time for a worker to fetch a chunk which is 8MB by default from a 
reduced partition. 
-    - MasterPushDataTime
+    - PrimaryPushDataTime
       - The time for a worker to handle a pushData RPC sent from a celeborn 
client.
-    - SlavePushDataTime
+    - ReplicaPushDataTime
       - The time for a worker to handle a pushData RPC sent from a celeborn 
worker by replicating.
     - WriteDataFailCount
     - ReplicateDataFailCount
@@ -147,12 +147,12 @@ These metrics are exposed by Celeborn worker.
     - PushDataHandshakeFailCount
     - RegionStartFailCount
     - RegionFinishFailCount
-    - MasterPushDataHandshakeTime
-    - SlavePushDataHandshakeTime
-    - MasterRegionStartTime
-    - SlaveRegionStartTime
-    - MasterRegionFinishTime
-    - SlaveRegionFinishTime
+    - PrimaryPushDataHandshakeTime
+    - ReplicaPushDataHandshakeTime
+    - PrimaryRegionStartTime
+    - ReplicaRegionStartTime
+    - PrimaryRegionFinishTime
+    - ReplicaRegionFinishTime
     - TakeBufferTime
       - The time for a worker to take out a buffer from a disk flusher.
     - RegisteredShuffleCount
@@ -297,4 +297,4 @@ API path listed as below:
 | /listPartitionLocationInfo | worker          | List all living 
PartitionLocation information in that worker.                                   
                                                                                
      |
 | /unavailablePeers          | worker          | List the unavailable peers of 
the worker, this always means the worker connect to the peer failed.            
                                                                        |
 | /isShutdown                | worker          | Show if the worker is during 
the process of shutdown.                                                        
                                                                         |
-| /isRegistered              | worker          | Show if the worker is 
registered to the master success.                                               
                                                                                
|
\ No newline at end of file
+| /isRegistered              | worker          | Show if the worker is 
registered to the master success.                                               
                                                                                
|
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
index 95714fc58..11fd29197 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
@@ -38,8 +38,8 @@ class PushDataTimeoutTest extends AnyFunSuite
   override def beforeAll(): Unit = {
     logInfo("test initialized, setup celeborn mini cluster")
     val workerConf = Map(
-      CelebornConf.TEST_CLIENT_PUSH_MASTER_DATA_TIMEOUT.key -> "true",
-      CelebornConf.TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT.key -> "true")
+      CelebornConf.TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT.key -> "true",
+      CelebornConf.TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT.key -> "true")
     // required at least 4 workers, the reason behind this requirement is that 
when replication is
     // enabled, there is a possibility that two workers might be added to the 
excluded list due to
     // master/slave timeout issues, then there are not enough workers to do 
replication if available
@@ -65,7 +65,7 @@ class PushDataTimeoutTest extends AnyFunSuite
         .set(s"spark.${CelebornConf.CLIENT_PUSH_DATA_TIMEOUT.key}", "5s")
         .set(s"spark.celeborn.data.push.timeoutCheck.interval", "2s")
         .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", 
enabled.toString)
-        
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED.key}", 
"false")
+        
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED.key}", 
"false")
         // make sure PushDataHandler.handlePushData be triggered
         .set(s"spark.${CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key}", "5")
 
@@ -97,7 +97,7 @@ class PushDataTimeoutTest extends AnyFunSuite
         .set(s"spark.${CelebornConf.CLIENT_PUSH_DATA_TIMEOUT.key}", "5s")
         .set(s"spark.celeborn.data.push.timeoutCheck.interval", "2s")
         .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", 
enabled.toString)
-        
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED.key}", 
"false")
+        
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED.key}", 
"false")
 
       val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
       val sqlResult = runsql(sparkSession)
@@ -123,7 +123,7 @@ class PushDataTimeoutTest extends AnyFunSuite
   test("celeborn spark integration test - pushdata timeout will add to 
pushExcludedWorkers") {
     val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2]")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_DATA_TIMEOUT.key}", "5s")
-      
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED.key}", 
"true")
+      
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED.key}", 
"true")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true")
     val rssSparkSession = SparkSession.builder()
       .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 54f167e4c..09d2d2af6 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -58,8 +58,8 @@ class PushDataHandler extends BaseMessageHandler with Logging 
{
   private var workerPartitionSplitEnabled: Boolean = _
   private var workerReplicateRandomConnectionEnabled: Boolean = _
 
-  private var testPushMasterDataTimeout: Boolean = _
-  private var testPushSlaveDataTimeout: Boolean = _
+  private var testPushPrimaryDataTimeout: Boolean = _
+  private var testPushReplicaDataTimeout: Boolean = _
 
   def init(worker: Worker): Unit = {
     workerSource = worker.workerSource
@@ -79,8 +79,8 @@ class PushDataHandler extends BaseMessageHandler with Logging 
{
     workerPartitionSplitEnabled = worker.conf.workerPartitionSplitEnabled
     workerReplicateRandomConnectionEnabled = 
worker.conf.workerReplicateRandomConnectionEnabled
 
-    testPushMasterDataTimeout = worker.conf.testPushMasterDataTimeout
-    testPushSlaveDataTimeout = worker.conf.testPushSlaveDataTimeout
+    testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
+    testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
 
     logInfo(s"diskReserveSize $diskReserveSize")
   }
@@ -131,12 +131,12 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     val isMaster = mode == PartitionLocation.Mode.MASTER
 
     // For test
-    if (isMaster && testPushMasterDataTimeout &&
+    if (isMaster && testPushPrimaryDataTimeout &&
       !PushDataHandler.pushMasterDataTimeoutTested.getAndSet(true)) {
       return
     }
 
-    if (!isMaster && testPushSlaveDataTimeout &&
+    if (!isMaster && testPushReplicaDataTimeout &&
       !PushDataHandler.pushSlaveDataTimeoutTested.getAndSet(true)) {
       return
     }
@@ -146,13 +146,13 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       if (isMaster) {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.MasterPushDataTime,
+          WorkerSource.PrimaryPushDataTime,
           key,
           callback)
       } else {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.SlavePushDataTime,
+          WorkerSource.ReplicaPushDataTime,
           key,
           callback)
       }
@@ -394,24 +394,24 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       if (isMaster) {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.MasterPushDataTime,
+          WorkerSource.PrimaryPushDataTime,
           key,
           callback)
       } else {
         new RpcResponseCallbackWithTimer(
           workerSource,
-          WorkerSource.SlavePushDataTime,
+          WorkerSource.ReplicaPushDataTime,
           key,
           callback)
       }
 
     // For test
-    if (isMaster && testPushMasterDataTimeout &&
+    if (isMaster && testPushPrimaryDataTimeout &&
       !PushDataHandler.pushMasterMergeDataTimeoutTested.getAndSet(true)) {
       return
     }
 
-    if (!isMaster && testPushSlaveDataTimeout &&
+    if (!isMaster && testPushReplicaDataTimeout &&
       !PushDataHandler.pushSlaveMergeDataTimeoutTested.getAndSet(true)) {
       return
     }
@@ -726,9 +726,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
 
     val key = s"${pushData.requestId}"
     if (isMaster) {
-      workerSource.startTimer(WorkerSource.MasterPushDataTime, key)
+      workerSource.startTimer(WorkerSource.PrimaryPushDataTime, key)
     } else {
-      workerSource.startTimer(WorkerSource.SlavePushDataTime, key)
+      workerSource.startTimer(WorkerSource.ReplicaPushDataTime, key)
     }
 
     // find FileWriter responsible for the data
@@ -746,7 +746,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
         pushData.requestId,
         null,
         location,
-        if (isMaster) WorkerSource.MasterPushDataTime else 
WorkerSource.SlavePushDataTime,
+        if (isMaster) WorkerSource.PrimaryPushDataTime else 
WorkerSource.ReplicaPushDataTime,
         callback)
 
     if (locationIsNull(
@@ -845,11 +845,11 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     val (workerSourceMaster, workerSourceSlave) =
       messageType match {
         case Type.PUSH_DATA_HAND_SHAKE =>
-          (WorkerSource.MasterPushDataHandshakeTime, 
WorkerSource.SlavePushDataHandshakeTime)
+          (WorkerSource.PrimaryPushDataHandshakeTime, 
WorkerSource.ReplicaPushDataHandshakeTime)
         case Type.REGION_START =>
-          (WorkerSource.MasterRegionStartTime, 
WorkerSource.SlaveRegionStartTime)
+          (WorkerSource.PrimaryRegionStartTime, 
WorkerSource.ReplicaRegionStartTime)
         case Type.REGION_FINISH =>
-          (WorkerSource.MasterRegionFinishTime, 
WorkerSource.SlaveRegionFinishTime)
+          (WorkerSource.PrimaryRegionFinishTime, 
WorkerSource.ReplicaRegionFinishTime)
         case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 6cbff783b..e29fbff03 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -41,15 +41,15 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
   addTimer(CommitFilesTime)
   addTimer(ReserveSlotsTime)
   addTimer(FlushDataTime)
-  addTimer(MasterPushDataTime)
-  addTimer(SlavePushDataTime)
+  addTimer(PrimaryPushDataTime)
+  addTimer(ReplicaPushDataTime)
 
-  addTimer(MasterPushDataHandshakeTime)
-  addTimer(SlavePushDataHandshakeTime)
-  addTimer(MasterRegionStartTime)
-  addTimer(SlaveRegionStartTime)
-  addTimer(MasterRegionFinishTime)
-  addTimer(SlaveRegionFinishTime)
+  addTimer(PrimaryPushDataHandshakeTime)
+  addTimer(ReplicaPushDataHandshakeTime)
+  addTimer(PrimaryRegionStartTime)
+  addTimer(ReplicaRegionStartTime)
+  addTimer(PrimaryRegionFinishTime)
+  addTimer(ReplicaRegionFinishTime)
 
   addTimer(FetchChunkTime)
   addTimer(OpenStreamTime)
@@ -72,8 +72,8 @@ object WorkerSource {
   val FetchChunkTime = "FetchChunkTime"
 
   // push data
-  val MasterPushDataTime = "MasterPushDataTime"
-  val SlavePushDataTime = "SlavePushDataTime"
+  val PrimaryPushDataTime = "PrimaryPushDataTime"
+  val ReplicaPushDataTime = "ReplicaPushDataTime"
   val WriteDataFailCount = "WriteDataFailCount"
   val ReplicateDataFailCount = "ReplicateDataFailCount"
   val ReplicateDataWriteFailCount = "ReplicateDataWriteFailCount"
@@ -83,12 +83,12 @@ object WorkerSource {
   val PushDataHandshakeFailCount = "PushDataHandshakeFailCount"
   val RegionStartFailCount = "RegionStartFailCount"
   val RegionFinishFailCount = "RegionFinishFailCount"
-  val MasterPushDataHandshakeTime = "MasterPushDataHandshakeTime"
-  val SlavePushDataHandshakeTime = "SlavePushDataHandshakeTime"
-  val MasterRegionStartTime = "MasterRegionStartTime"
-  val SlaveRegionStartTime = "SlaveRegionStartTime"
-  val MasterRegionFinishTime = "MasterRegionFinishTime"
-  val SlaveRegionFinishTime = "SlaveRegionFinishTime"
+  val PrimaryPushDataHandshakeTime = "PrimaryPushDataHandshakeTime"
+  val ReplicaPushDataHandshakeTime = "ReplicaPushDataHandshakeTime"
+  val PrimaryRegionStartTime = "PrimaryRegionStartTime"
+  val ReplicaRegionStartTime = "ReplicaRegionStartTime"
+  val PrimaryRegionFinishTime = "PrimaryRegionFinishTime"
+  val ReplicaRegionFinishTime = "ReplicaRegionFinishTime"
 
   // flush
   val TakeBufferTime = "TakeBufferTime"


Reply via email to