This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 8be96862f [CELEBORN-726] Update data replication terminology from
`master/slave` to `primary/replica` for configurations and metrics
8be96862f is described below
commit 8be96862f7ee008bb475bd1b61e42021fdb6325a
Author: Fu Chen <[email protected]>
AuthorDate: Thu Jun 29 09:47:02 2023 +0800
[CELEBORN-726] Update data replication terminology from `master/slave` to
`primary/replica` for configurations and metrics
### What changes were proposed in this pull request?
This pull PR is an integral component of #1639 . It primarily focuses on
updating configuration settings and metrics terminology, while ensuring
compatibility with older client versions by refraining from introducing changes
related to RPC.
### Why are the changes needed?
In order to distinguish it from the existing master/worker, refactor data
replication terminology to 'primary/replica' for improved clarity and
inclusivity in the codebase
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests.
Closes #1650 from cfmcgrady/primary-replica-metrics.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 17c1e018746f903beac07334a6790b79898dca80)
Signed-off-by: Cheng Pan <[email protected]>
---
METRICS.md | 4 +-
assets/grafana/rss-dashboard.json | 66 +++++++++++-----------
.../celeborn/client/WorkerStatusTracker.scala | 8 +--
.../org/apache/celeborn/common/CelebornConf.scala | 24 ++++----
docs/configuration/client.md | 2 +-
docs/migration.md | 14 +++++
docs/monitoring.md | 18 +++---
.../celeborn/tests/spark/PushDataTimeoutTest.scala | 10 ++--
.../service/deploy/worker/PushDataHandler.scala | 36 ++++++------
.../service/deploy/worker/WorkerSource.scala | 32 +++++------
10 files changed, 114 insertions(+), 100 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index 3cd32313d..1ddcb85ce 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -73,8 +73,8 @@ Here is an example of grafana dashboard importing.
| FlushDataTime | worker |
FlushData means flush a disk buffer to disk.
|
| OpenStreamTime | worker |
OpenStream means read a shuffle file and send client about chunks size and
stream index. |
| FetchChunkTime | worker |
FetchChunk means read a chunk from a shuffle file and send to client.
|
-| MasterPushDataTime | worker |
MasterPushData means handle pushdata of master partition location.
|
-| SlavePushDataTime | worker |
SlavePushData means handle pushdata of slave partition location.
|
+| PrimaryPushDataTime | worker |
PrimaryPushData means handle pushdata of primary partition location.
|
+| ReplicaPushDataTime | worker |
ReplicaPushData means handle pushdata of replica partition location.
|
| WriteDataFailCount | worker |
The count of writing PushData or PushMergedData failed in current worker.
|
| ReplicateDataFailCount | worker |
The count of replicating PushData or PushMergedData failed in current worker.
|
| ReplicateDataWriteFailCount | worker | The count
of replicating PushData or PushMergedData failed caused by write failure in
peer worker. |
diff --git a/assets/grafana/rss-dashboard.json
b/assets/grafana/rss-dashboard.json
index 2a424a984..726c97b03 100644
--- a/assets/grafana/rss-dashboard.json
+++ b/assets/grafana/rss-dashboard.json
@@ -1353,11 +1353,11 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_MasterPushDataTime_Mean",
+ "expr": "metrics_PrimaryPushDataTime_Mean",
"refId": "A"
}
],
- "title": "metrics_MasterPushDataTime_Mean",
+ "title": "metrics_PrimaryPushDataTime_Mean",
"type": "timeseries"
},
{
@@ -1442,11 +1442,11 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_MasterPushDataTime_Max",
+ "expr": "metrics_PrimaryPushDataTime_Max",
"refId": "A"
}
],
- "title": "metrics_MasterPushDataTime_Max",
+ "title": "metrics_PrimaryPushDataTime_Max",
"type": "timeseries"
},
{
@@ -1531,11 +1531,11 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_SlavePushDataTime_Mean",
+ "expr": "metrics_ReplicaPushDataTime_Mean",
"refId": "A"
}
],
- "title": "metrics_SlavePushDataTime_Mean",
+ "title": "metrics_ReplicaPushDataTime_Mean",
"type": "timeseries"
},
{
@@ -1620,11 +1620,11 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_SlavePushDataTime_Max",
+ "expr": "metrics_ReplicaPushDataTime_Max",
"refId": "A"
}
],
- "title": "metrics_SlavePushDataTime_Max",
+ "title": "metrics_ReplicaPushDataTime_Max",
"type": "timeseries"
},
{
@@ -5040,12 +5040,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_MasterPushDataHandshakeTime_Mean",
+ "expr": "metrics_PrimaryPushDataHandshakeTime_Mean",
"range": true,
"refId": "A"
}
],
- "title": "metrics_MasterPushDataHandshakeTime_Mean",
+ "title": "metrics_PrimaryPushDataHandshakeTime_Mean",
"type": "timeseries"
},
{
@@ -5131,12 +5131,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_MasterPushDataHandshakeTime_Max",
+ "expr": "metrics_PrimaryPushDataHandshakeTime_Max",
"range": true,
"refId": "A"
}
],
- "title": "metrics_MasterPushDataHandshakeTime_Max",
+ "title": "metrics_PrimaryPushDataHandshakeTime_Max",
"type": "timeseries"
},
{
@@ -5222,12 +5222,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_SlavePushDataHandshakeTime_Mean",
+ "expr": "metrics_ReplicaPushDataHandshakeTime_Mean",
"range": true,
"refId": "A"
}
],
- "title": "metrics_SlavePushDataHandshakeTime_Mean",
+ "title": "metrics_ReplicaPushDataHandshakeTime_Mean",
"type": "timeseries"
},
{
@@ -5313,12 +5313,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_SlavePushDataHandshakeTime_Max",
+ "expr": "metrics_ReplicaPushDataHandshakeTime_Max",
"range": true,
"refId": "A"
}
],
- "title": "metrics_SlavePushDataHandshakeTime_Max",
+ "title": "metrics_ReplicaPushDataHandshakeTime_Max",
"type": "timeseries"
},
{
@@ -5404,12 +5404,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_MasterRegionStartTime_Mean",
+ "expr": "metrics_PrimaryRegionStartTime_Mean",
"range": true,
"refId": "A"
}
],
- "title": "metrics_MasterRegionStartTime_Mean",
+ "title": "metrics_PrimaryRegionStartTime_Mean",
"type": "timeseries"
},
{
@@ -5495,12 +5495,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_MasterRegionStartTime_Max",
+ "expr": "metrics_PrimaryRegionStartTime_Max",
"range": true,
"refId": "A"
}
],
- "title": "metrics_MasterRegionStartTime_Max",
+ "title": "metrics_PrimaryRegionStartTime_Max",
"type": "timeseries"
},
{
@@ -5586,12 +5586,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_SlaveRegionStartTime_Mean",
+ "expr": "metrics_ReplicaRegionStartTime_Mean",
"range": true,
"refId": "A"
}
],
- "title": "metrics_SlaveRegionStartTime_Mean",
+ "title": "metrics_ReplicaRegionStartTime_Mean",
"type": "timeseries"
},
{
@@ -5677,12 +5677,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_SlaveRegionStartTime_Max",
+ "expr": "metrics_ReplicaRegionStartTime_Max",
"range": true,
"refId": "A"
}
],
- "title": "metrics_SlaveRegionStartTime_Max",
+ "title": "metrics_ReplicaRegionStartTime_Max",
"type": "timeseries"
},
{
@@ -5768,12 +5768,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_MasterRegionFinishTime_Mean",
+ "expr": "metrics_PrimaryRegionFinishTime_Mean",
"range": true,
"refId": "A"
}
],
- "title": "metrics_MasterRegionFinishTime_Mean",
+ "title": "metrics_PrimaryRegionFinishTime_Mean",
"type": "timeseries"
},
{
@@ -5859,12 +5859,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_MasterRegionFinishTime_Max",
+ "expr": "metrics_PrimaryRegionFinishTime_Max",
"range": true,
"refId": "A"
}
],
- "title": "metrics_MasterRegionFinishTime_Max",
+ "title": "metrics_PrimaryRegionFinishTime_Max",
"type": "timeseries"
},
{
@@ -5950,12 +5950,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_SlaveRegionFinishTime_Mean",
+ "expr": "metrics_ReplicaRegionFinishTime_Mean",
"range": true,
"refId": "A"
}
],
- "title": "metrics_SlaveRegionFinishTime_Mean",
+ "title": "metrics_ReplicaRegionFinishTime_Mean",
"type": "timeseries"
},
{
@@ -6041,12 +6041,12 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "metrics_SlaveRegionFinishTime_Max",
+ "expr": "metrics_ReplicaRegionFinishTime_Max",
"range": true,
"refId": "A"
}
],
- "title": "metrics_SlaveRegionFinishTime_Max",
+ "title": "metrics_ReplicaRegionFinishTime_Max",
"type": "timeseries"
},
{
@@ -7401,4 +7401,4 @@
"uid": "U_qgru_7z",
"version": 16,
"weekStart": ""
-}
\ No newline at end of file
+}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 1c71c2a45..2519bad3b 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -84,26 +84,26 @@ class WorkerStatusTracker(
case StatusCode.PUSH_DATA_WRITE_FAIL_MASTER =>
excludeWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_MASTER)
case StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE
- if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled
=>
+ if oldPartition.hasPeer &&
conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(oldPartition.getPeer,
StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE)
case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
excludeWorker(oldPartition,
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
- if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled
=>
+ if oldPartition.hasPeer &&
conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
excludeWorker(oldPartition,
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
- if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled
=>
+ if oldPartition.hasPeer &&
conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
case StatusCode.PUSH_DATA_TIMEOUT_MASTER =>
excludeWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_MASTER)
case StatusCode.PUSH_DATA_TIMEOUT_SLAVE
- if oldPartition.hasPeer && conf.clientExcludeSlaveOnFailureEnabled
=>
+ if oldPartition.hasPeer &&
conf.clientExcludeReplicaOnFailureEnabled =>
excludeWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_TIMEOUT_SLAVE)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 85717075f..6b0bef42c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -695,7 +695,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def clientCheckedUseAllocatedWorkers: Boolean =
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
def clientExcludedWorkerExpireTimeout: Long =
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
- def clientExcludeSlaveOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED)
+ def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED)
// //////////////////////////////////////////////////////
// Shuffle Compression //
@@ -985,8 +985,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
// //////////////////////////////////////////////////////
def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
def testRetryCommitFiles: Boolean = get(TEST_CLIENT_RETRY_COMMIT_FILE)
- def testPushMasterDataTimeout: Boolean =
get(TEST_CLIENT_PUSH_MASTER_DATA_TIMEOUT)
- def testPushSlaveDataTimeout: Boolean =
get(TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT)
+ def testPushPrimaryDataTimeout: Boolean =
get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
+ def testPushReplicaDataTimeout: Boolean =
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
def clientFlinkMemoryPerResultPartitionMin: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
@@ -2575,12 +2575,12 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
- val CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
- buildConf("celeborn.client.excludeSlaveOnFailure.enabled")
+ val CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.excludeReplicaOnFailure.enabled")
.categories("client")
.version("0.3.0")
.doc("When true, Celeborn will exclude partition's peer worker on
failure " +
- "when push data to slave failed.")
+ "when push data to replica failed.")
.booleanConf
.createWithDefault(true)
@@ -2742,22 +2742,22 @@ object CelebornConf extends Logging {
.checkValue(_ > 0, "celeborn.client.push.data.timeout must be positive!")
.createWithDefaultString("120s")
- val TEST_CLIENT_PUSH_MASTER_DATA_TIMEOUT: ConfigEntry[Boolean] =
- buildConf("celeborn.test.worker.pushMasterDataTimeout")
+ val TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT: ConfigEntry[Boolean] =
+ buildConf("celeborn.test.worker.pushPrimaryDataTimeout")
.withAlternative("celeborn.test.pushMasterDataTimeout")
.internal
.categories("test", "worker")
.version("0.3.0")
- .doc("Whether to test push master data timeout")
+ .doc("Whether to test push primary data timeout")
.booleanConf
.createWithDefault(false)
- val TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT: ConfigEntry[Boolean] =
- buildConf("celeborn.test.worker.pushSlaveDataTimeout")
+ val TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT: ConfigEntry[Boolean] =
+ buildConf("celeborn.test.worker.pushReplicaDataTimeout")
.internal
.categories("test", "worker")
.version("0.3.0")
- .doc("Whether to test push slave data timeout")
+ .doc("Whether to test push replica data timeout")
.booleanConf
.createWithDefault(false)
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index f93ba9f97..bec86be1c 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -22,7 +22,7 @@ license: |
| celeborn.client.application.heartbeatInterval | 10s | Interval for client to
send heartbeat message to master. | 0.3.0 |
| celeborn.client.closeIdleConnections | true | Whether client will close idle
connections. | 0.3.0 |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | When true,
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 |
-| celeborn.client.excludeSlaveOnFailure.enabled | true | When true, Celeborn
will exclude partition's peer worker on failure when push data to slave failed.
| 0.3.0 |
+| celeborn.client.excludeReplicaOnFailure.enabled | true | When true, Celeborn
will exclude partition's peer worker on failure when push data to replica
failed. | 0.3.0 |
| celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for
LifecycleManager to clear reserved excluded worker. Default to be 1.5 *
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout
check period | 0.3.0 |
| celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 |
| celeborn.client.fetch.excludedWorker.expireTimeout | <value of
celeborn.client.excludedWorker.expireTimeout> | ShuffleClient is a static
object, it will be used in the whole lifecycle of Executor,We give a expire
time for excluded workers to avoid a transient worker issues. | 0.3.0 |
diff --git a/docs/migration.md b/docs/migration.md
index af76683ee..81170aa53 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -47,3 +47,17 @@ license: |
- Since 0.3.0, Celeborn master metrics `BlacklistedWorkerCount` is renamed as
`ExcludedWorkerCount`.
- Since 0.3.0, Celeborn master http request url `/blacklistedWorkers` is
renamed as `/excludedWorkers`.
+
+ - Since 0.3.0, introduces a terminology update for Celeborn worker data
replication, replacing the previous `master/slave` terminology with
`primary/replica`. In alignment with this change, corresponding metrics
keywords have been adjusted.
+ The following table presents a comprehensive overview of the changes:
+
+ | Key Before v0.3.0 | Key After v0.3.0 |
+ |-------------------------------|--------------------------------|
+ | `MasterPushDataTime` | `PrimaryPushDataTime` |
+ | `MasterPushDataHandshakeTime` | `PrimaryPushDataHandshakeTime` |
+ | `MasterRegionStartTime` | `PrimaryRegionStartTime` |
+ | `MasterRegionFinishTime` | `PrimaryRegionFinishTime` |
+ | `SlavePushDataTime` | `ReplicaPushDataTime` |
+ | `SlavePushDataHandshakeTime` | `ReplicaPushDataHandshakeTime` |
+ | `SlaveRegionStartTime` | `ReplicaRegionStartTime` |
+ | `SlaveRegionFinishTime` | `ReplicaRegionFinishTime` |
diff --git a/docs/monitoring.md b/docs/monitoring.md
index efe93b4d5..92db7c922 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -134,9 +134,9 @@ These metrics are exposed by Celeborn worker.
- The time for a worker to process openStream RPC and return
StreamHandle.
- FetchChunkTime
- The time for a worker to fetch a chunk which is 8MB by default from a
reduced partition.
- - MasterPushDataTime
+ - PrimaryPushDataTime
- The time for a worker to handle a pushData RPC sent from a celeborn
client.
- - SlavePushDataTime
+ - ReplicaPushDataTime
- The time for a worker to handle a pushData RPC sent from a celeborn
worker by replicating.
- WriteDataFailCount
- ReplicateDataFailCount
@@ -147,12 +147,12 @@ These metrics are exposed by Celeborn worker.
- PushDataHandshakeFailCount
- RegionStartFailCount
- RegionFinishFailCount
- - MasterPushDataHandshakeTime
- - SlavePushDataHandshakeTime
- - MasterRegionStartTime
- - SlaveRegionStartTime
- - MasterRegionFinishTime
- - SlaveRegionFinishTime
+ - PrimaryPushDataHandshakeTime
+ - ReplicaPushDataHandshakeTime
+ - PrimaryRegionStartTime
+ - ReplicaRegionStartTime
+ - PrimaryRegionFinishTime
+ - ReplicaRegionFinishTime
- TakeBufferTime
- The time for a worker to take out a buffer from a disk flusher.
- RegisteredShuffleCount
@@ -297,4 +297,4 @@ API path listed as below:
| /listPartitionLocationInfo | worker | List all living
PartitionLocation information in that worker.
|
| /unavailablePeers | worker | List the unavailable peers of
the worker, this always means the worker connect to the peer failed.
|
| /isShutdown | worker | Show if the worker is during
the process of shutdown.
|
-| /isRegistered | worker | Show if the worker is
registered to the master success.
|
\ No newline at end of file
+| /isRegistered | worker | Show if the worker is
registered to the master success.
|
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
index 95714fc58..11fd29197 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
@@ -38,8 +38,8 @@ class PushDataTimeoutTest extends AnyFunSuite
override def beforeAll(): Unit = {
logInfo("test initialized, setup celeborn mini cluster")
val workerConf = Map(
- CelebornConf.TEST_CLIENT_PUSH_MASTER_DATA_TIMEOUT.key -> "true",
- CelebornConf.TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT.key -> "true")
+ CelebornConf.TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT.key -> "true",
+ CelebornConf.TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT.key -> "true")
// required at least 4 workers, the reason behind this requirement is that
when replication is
// enabled, there is a possibility that two workers might be added to the
excluded list due to
// master/slave timeout issues, then there are not enough workers to do
replication if available
@@ -65,7 +65,7 @@ class PushDataTimeoutTest extends AnyFunSuite
.set(s"spark.${CelebornConf.CLIENT_PUSH_DATA_TIMEOUT.key}", "5s")
.set(s"spark.celeborn.data.push.timeoutCheck.interval", "2s")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}",
enabled.toString)
-
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED.key}",
"false")
+
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED.key}",
"false")
// make sure PushDataHandler.handlePushData be triggered
.set(s"spark.${CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key}", "5")
@@ -97,7 +97,7 @@ class PushDataTimeoutTest extends AnyFunSuite
.set(s"spark.${CelebornConf.CLIENT_PUSH_DATA_TIMEOUT.key}", "5s")
.set(s"spark.celeborn.data.push.timeoutCheck.interval", "2s")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}",
enabled.toString)
-
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED.key}",
"false")
+
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED.key}",
"false")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sqlResult = runsql(sparkSession)
@@ -123,7 +123,7 @@ class PushDataTimeoutTest extends AnyFunSuite
test("celeborn spark integration test - pushdata timeout will add to
pushExcludedWorkers") {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2]")
.set(s"spark.${CelebornConf.CLIENT_PUSH_DATA_TIMEOUT.key}", "5s")
-
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_SLAVE_ON_FAILURE_ENABLED.key}",
"true")
+
.set(s"spark.${CelebornConf.CLIENT_EXCLUDE_REPLICA_ON_FAILURE_ENABLED.key}",
"true")
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true")
val rssSparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 54f167e4c..09d2d2af6 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -58,8 +58,8 @@ class PushDataHandler extends BaseMessageHandler with Logging
{
private var workerPartitionSplitEnabled: Boolean = _
private var workerReplicateRandomConnectionEnabled: Boolean = _
- private var testPushMasterDataTimeout: Boolean = _
- private var testPushSlaveDataTimeout: Boolean = _
+ private var testPushPrimaryDataTimeout: Boolean = _
+ private var testPushReplicaDataTimeout: Boolean = _
def init(worker: Worker): Unit = {
workerSource = worker.workerSource
@@ -79,8 +79,8 @@ class PushDataHandler extends BaseMessageHandler with Logging
{
workerPartitionSplitEnabled = worker.conf.workerPartitionSplitEnabled
workerReplicateRandomConnectionEnabled =
worker.conf.workerReplicateRandomConnectionEnabled
- testPushMasterDataTimeout = worker.conf.testPushMasterDataTimeout
- testPushSlaveDataTimeout = worker.conf.testPushSlaveDataTimeout
+ testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
+ testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
logInfo(s"diskReserveSize $diskReserveSize")
}
@@ -131,12 +131,12 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
val isMaster = mode == PartitionLocation.Mode.MASTER
// For test
- if (isMaster && testPushMasterDataTimeout &&
+ if (isMaster && testPushPrimaryDataTimeout &&
!PushDataHandler.pushMasterDataTimeoutTested.getAndSet(true)) {
return
}
- if (!isMaster && testPushSlaveDataTimeout &&
+ if (!isMaster && testPushReplicaDataTimeout &&
!PushDataHandler.pushSlaveDataTimeoutTested.getAndSet(true)) {
return
}
@@ -146,13 +146,13 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (isMaster) {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.MasterPushDataTime,
+ WorkerSource.PrimaryPushDataTime,
key,
callback)
} else {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.SlavePushDataTime,
+ WorkerSource.ReplicaPushDataTime,
key,
callback)
}
@@ -394,24 +394,24 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
if (isMaster) {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.MasterPushDataTime,
+ WorkerSource.PrimaryPushDataTime,
key,
callback)
} else {
new RpcResponseCallbackWithTimer(
workerSource,
- WorkerSource.SlavePushDataTime,
+ WorkerSource.ReplicaPushDataTime,
key,
callback)
}
// For test
- if (isMaster && testPushMasterDataTimeout &&
+ if (isMaster && testPushPrimaryDataTimeout &&
!PushDataHandler.pushMasterMergeDataTimeoutTested.getAndSet(true)) {
return
}
- if (!isMaster && testPushSlaveDataTimeout &&
+ if (!isMaster && testPushReplicaDataTimeout &&
!PushDataHandler.pushSlaveMergeDataTimeoutTested.getAndSet(true)) {
return
}
@@ -726,9 +726,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
val key = s"${pushData.requestId}"
if (isMaster) {
- workerSource.startTimer(WorkerSource.MasterPushDataTime, key)
+ workerSource.startTimer(WorkerSource.PrimaryPushDataTime, key)
} else {
- workerSource.startTimer(WorkerSource.SlavePushDataTime, key)
+ workerSource.startTimer(WorkerSource.ReplicaPushDataTime, key)
}
// find FileWriter responsible for the data
@@ -746,7 +746,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
pushData.requestId,
null,
location,
- if (isMaster) WorkerSource.MasterPushDataTime else
WorkerSource.SlavePushDataTime,
+ if (isMaster) WorkerSource.PrimaryPushDataTime else
WorkerSource.ReplicaPushDataTime,
callback)
if (locationIsNull(
@@ -845,11 +845,11 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
val (workerSourceMaster, workerSourceSlave) =
messageType match {
case Type.PUSH_DATA_HAND_SHAKE =>
- (WorkerSource.MasterPushDataHandshakeTime,
WorkerSource.SlavePushDataHandshakeTime)
+ (WorkerSource.PrimaryPushDataHandshakeTime,
WorkerSource.ReplicaPushDataHandshakeTime)
case Type.REGION_START =>
- (WorkerSource.MasterRegionStartTime,
WorkerSource.SlaveRegionStartTime)
+ (WorkerSource.PrimaryRegionStartTime,
WorkerSource.ReplicaRegionStartTime)
case Type.REGION_FINISH =>
- (WorkerSource.MasterRegionFinishTime,
WorkerSource.SlaveRegionFinishTime)
+ (WorkerSource.PrimaryRegionFinishTime,
WorkerSource.ReplicaRegionFinishTime)
case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 6cbff783b..e29fbff03 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -41,15 +41,15 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
addTimer(CommitFilesTime)
addTimer(ReserveSlotsTime)
addTimer(FlushDataTime)
- addTimer(MasterPushDataTime)
- addTimer(SlavePushDataTime)
+ addTimer(PrimaryPushDataTime)
+ addTimer(ReplicaPushDataTime)
- addTimer(MasterPushDataHandshakeTime)
- addTimer(SlavePushDataHandshakeTime)
- addTimer(MasterRegionStartTime)
- addTimer(SlaveRegionStartTime)
- addTimer(MasterRegionFinishTime)
- addTimer(SlaveRegionFinishTime)
+ addTimer(PrimaryPushDataHandshakeTime)
+ addTimer(ReplicaPushDataHandshakeTime)
+ addTimer(PrimaryRegionStartTime)
+ addTimer(ReplicaRegionStartTime)
+ addTimer(PrimaryRegionFinishTime)
+ addTimer(ReplicaRegionFinishTime)
addTimer(FetchChunkTime)
addTimer(OpenStreamTime)
@@ -72,8 +72,8 @@ object WorkerSource {
val FetchChunkTime = "FetchChunkTime"
// push data
- val MasterPushDataTime = "MasterPushDataTime"
- val SlavePushDataTime = "SlavePushDataTime"
+ val PrimaryPushDataTime = "PrimaryPushDataTime"
+ val ReplicaPushDataTime = "ReplicaPushDataTime"
val WriteDataFailCount = "WriteDataFailCount"
val ReplicateDataFailCount = "ReplicateDataFailCount"
val ReplicateDataWriteFailCount = "ReplicateDataWriteFailCount"
@@ -83,12 +83,12 @@ object WorkerSource {
val PushDataHandshakeFailCount = "PushDataHandshakeFailCount"
val RegionStartFailCount = "RegionStartFailCount"
val RegionFinishFailCount = "RegionFinishFailCount"
- val MasterPushDataHandshakeTime = "MasterPushDataHandshakeTime"
- val SlavePushDataHandshakeTime = "SlavePushDataHandshakeTime"
- val MasterRegionStartTime = "MasterRegionStartTime"
- val SlaveRegionStartTime = "SlaveRegionStartTime"
- val MasterRegionFinishTime = "MasterRegionFinishTime"
- val SlaveRegionFinishTime = "SlaveRegionFinishTime"
+ val PrimaryPushDataHandshakeTime = "PrimaryPushDataHandshakeTime"
+ val ReplicaPushDataHandshakeTime = "ReplicaPushDataHandshakeTime"
+ val PrimaryRegionStartTime = "PrimaryRegionStartTime"
+ val ReplicaRegionStartTime = "ReplicaRegionStartTime"
+ val PrimaryRegionFinishTime = "PrimaryRegionFinishTime"
+ val ReplicaRegionFinishTime = "ReplicaRegionFinishTime"
// flush
val TakeBufferTime = "TakeBufferTime"