This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new ade934cc5 [CELEBORN-827] Eliminate unnecessary chunksBeingTransferred
calculation
ade934cc5 is described below
commit ade934cc592104fd65ca45ff3d0a3196baad142b
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jul 24 15:31:57 2023 +0800
[CELEBORN-827] Eliminate unnecessary chunksBeingTransferred calculation
### What changes were proposed in this pull request?
Eliminate `chunksBeingTransferred` calculation when
`celeborn.shuffle.io.maxChunksBeingTransferred` is not configured
### Why are the changes needed?
I observed high CPU usage on `ChunkStreamManager#chunksBeingTransferred`
calculation. We can eliminate the method call if no threshold is configured,
and investigate how to improve the method itself in the future.
<img width="1947" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/26535726/412c6a41-c0ce-440c-ae99-4424cb8702d3">
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI and Review.
Closes #1749 from pan3793/CELEBORN-827.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit fa79b263a0069408e9cdabb1145c023017f54246)
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 6 +-
docs/configuration/network.md | 2 +-
.../service/deploy/worker/FetchHandler.scala | 87 ++++++++++++----------
3 files changed, 51 insertions(+), 44 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a7d7843aa..ea79d461c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -466,7 +466,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def networkAllocatorVerboseMetric: Boolean =
get(NETWORK_MEMORY_ALLOCATOR_VERBOSE_METRIC)
- def shuffleIoMaxChunksBeingTransferred: Long = {
+ def shuffleIoMaxChunksBeingTransferred: Option[Long] = {
get(MAX_CHUNKS_BEING_TRANSFERRED)
}
@@ -1418,7 +1418,7 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("2m")
- val MAX_CHUNKS_BEING_TRANSFERRED: ConfigEntry[Long] =
+ val MAX_CHUNKS_BEING_TRANSFERRED: OptionalConfigEntry[Long] =
buildConf("celeborn.shuffle.io.maxChunksBeingTransferred")
.categories("network")
.doc("The max number of chunks allowed to be transferred at the same
time on shuffle service. Note " +
@@ -1427,7 +1427,7 @@ object CelebornConf extends Logging {
"`celeborn.<module>.io.retryWait`), if those limits are reached the
task will fail with fetch failure.")
.version("0.2.0")
.longConf
- .createWithDefault(Long.MaxValue)
+ .createOptional
val PUSH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.push.timeoutCheck.interval")
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 52b39b483..e1001ee9b 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -49,5 +49,5 @@ license: |
| celeborn.rpc.dispatcher.threads | <undefined> | Threads number of
message dispatcher event loop | 0.3.0 |
| celeborn.rpc.io.threads | <undefined> | Netty IO thread number of
NettyRpcEnv to handle RPC request. The default threads number is the number of
runtime available processors. | 0.2.0 |
| celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. |
0.2.0 |
-| celeborn.shuffle.io.maxChunksBeingTransferred | 9223372036854775807 | The
max number of chunks allowed to be transferred at the same time on shuffle
service. Note that new incoming connections will be closed when the max number
is hit. The client will retry according to the shuffle retry configs (see
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if
those limits are reached the task will fail with fetch failure. | 0.2.0 |
+| celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | The max
number of chunks allowed to be transferred at the same time on shuffle service.
Note that new incoming connections will be closed when the max number is hit.
The client will retry according to the shuffle retry configs (see
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if
those limits are reached the task will fail with fetch failure. | 0.2.0 |
<!--end-include-->
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 950b5208a..9b4d08f8c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -27,6 +27,7 @@ import com.google.common.base.Throwables
import io.netty.util.concurrent.{Future, GenericFutureListener}
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{FileInfo, FileManagedBuffers}
import org.apache.celeborn.common.network.buffer.NioManagedBuffer
@@ -36,12 +37,15 @@ import
org.apache.celeborn.common.network.protocol.Message.Type
import org.apache.celeborn.common.network.server.BaseMessageHandler
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.PartitionType
-import org.apache.celeborn.common.util.ExceptionUtils
+import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager,
CreditStreamManager, PartitionFilesSorter, StorageManager}
class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
extends BaseMessageHandler with Logging {
- var chunkStreamManager = new ChunkStreamManager()
+
+ val chunkStreamManager = new ChunkStreamManager()
+ val maxChunkBeingTransferred: Option[Long] =
conf.shuffleIoMaxChunksBeingTransferred
+
val creditStreamManager = new CreditStreamManager(
conf.partitionReadBuffersMin,
conf.partitionReadBuffersMax,
@@ -214,46 +218,49 @@ class FetchHandler(val conf: CelebornConf, val
transportConf: TransportConf)
logTrace(s"Received req from
${NettyUtils.getRemoteAddress(client.getChannel)}" +
s" to fetch block ${req.streamChunkSlice}")
- val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred
- if (chunksBeingTransferred > conf.shuffleIoMaxChunksBeingTransferred) {
- val message = "Worker is too busy. The number of chunks being
transferred " +
- s"$chunksBeingTransferred exceeds
celeborn.shuffle.maxChunksBeingTransferred " +
- s"${conf.shuffleIoMaxChunksBeingTransferred}."
- logError(message)
- client.getChannel.writeAndFlush(new
ChunkFetchFailure(req.streamChunkSlice, message))
- } else {
- workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
- val fetchTimeMetric =
chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId)
- val fetchBeginTime = System.nanoTime()
- try {
- val buf = chunkStreamManager.getChunk(
- req.streamChunkSlice.streamId,
- req.streamChunkSlice.chunkIndex,
- req.streamChunkSlice.offset,
- req.streamChunkSlice.len)
- chunkStreamManager.chunkBeingSent(req.streamChunkSlice.streamId)
- client.getChannel.writeAndFlush(new
ChunkFetchSuccess(req.streamChunkSlice, buf))
- .addListener(new GenericFutureListener[Future[_ >: Void]] {
- override def operationComplete(future: Future[_ >: Void]): Unit = {
- chunkStreamManager.chunkSent(req.streamChunkSlice.streamId)
- if (fetchTimeMetric != null) {
- fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
- }
- workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME,
req.toString)
- }
- })
- } catch {
- case e: Exception =>
- logError(
- s"Error opening block ${req.streamChunkSlice} for request from " +
- NettyUtils.getRemoteAddress(client.getChannel),
- e)
- client.getChannel.writeAndFlush(new ChunkFetchFailure(
- req.streamChunkSlice,
- Throwables.getStackTraceAsString(e)))
- workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+ maxChunkBeingTransferred.foreach { threshold =>
+ val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred
// take high cpu usage
+ if (chunksBeingTransferred > threshold) {
+ val message = "Worker is too busy. The number of chunks being
transferred " +
+ s"$chunksBeingTransferred exceeds
${MAX_CHUNKS_BEING_TRANSFERRED.key} " +
+ s"${Utils.bytesToString(threshold)}."
+ logError(message)
+ client.getChannel.writeAndFlush(new
ChunkFetchFailure(req.streamChunkSlice, message))
+ return
}
}
+
+ workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+ val fetchTimeMetric =
chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId)
+ val fetchBeginTime = System.nanoTime()
+ try {
+ val buf = chunkStreamManager.getChunk(
+ req.streamChunkSlice.streamId,
+ req.streamChunkSlice.chunkIndex,
+ req.streamChunkSlice.offset,
+ req.streamChunkSlice.len)
+ chunkStreamManager.chunkBeingSent(req.streamChunkSlice.streamId)
+ client.getChannel.writeAndFlush(new
ChunkFetchSuccess(req.streamChunkSlice, buf))
+ .addListener(new GenericFutureListener[Future[_ >: Void]] {
+ override def operationComplete(future: Future[_ >: Void]): Unit = {
+ chunkStreamManager.chunkSent(req.streamChunkSlice.streamId)
+ if (fetchTimeMetric != null) {
+ fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
+ }
+ workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+ }
+ })
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Error opening block ${req.streamChunkSlice} for request from " +
+ NettyUtils.getRemoteAddress(client.getChannel),
+ e)
+ client.getChannel.writeAndFlush(new ChunkFetchFailure(
+ req.streamChunkSlice,
+ Throwables.getStackTraceAsString(e)))
+ workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+ }
}
override def checkRegistered: Boolean = registered.get