This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new ade934cc5 [CELEBORN-827] Eliminate unnecessary chunksBeingTransferred 
calculation
ade934cc5 is described below

commit ade934cc592104fd65ca45ff3d0a3196baad142b
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jul 24 15:31:57 2023 +0800

    [CELEBORN-827] Eliminate unnecessary chunksBeingTransferred calculation
    
    ### What changes were proposed in this pull request?
    
    Eliminate `chunksBeingTransferred` calculation when 
`celeborn.shuffle.io.maxChunksBeingTransferred` is not configured
    
    ### Why are the changes needed?
    
    I observed high CPU usage on `ChunkStreamManager#chunksBeingTransferred` 
calculation. We can eliminate the method call if no threshold is configured, 
and investigate how to improve the method itself in the future.
    
    <img width="1947" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/26535726/412c6a41-c0ce-440c-ae99-4424cb8702d3";>
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI and Review.
    
    Closes #1749 from pan3793/CELEBORN-827.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit fa79b263a0069408e9cdabb1145c023017f54246)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  6 +-
 docs/configuration/network.md                      |  2 +-
 .../service/deploy/worker/FetchHandler.scala       | 87 ++++++++++++----------
 3 files changed, 51 insertions(+), 44 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a7d7843aa..ea79d461c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -466,7 +466,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
 
   def networkAllocatorVerboseMetric: Boolean = 
get(NETWORK_MEMORY_ALLOCATOR_VERBOSE_METRIC)
 
-  def shuffleIoMaxChunksBeingTransferred: Long = {
+  def shuffleIoMaxChunksBeingTransferred: Option[Long] = {
     get(MAX_CHUNKS_BEING_TRANSFERRED)
   }
 
@@ -1418,7 +1418,7 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("2m")
 
-  val MAX_CHUNKS_BEING_TRANSFERRED: ConfigEntry[Long] =
+  val MAX_CHUNKS_BEING_TRANSFERRED: OptionalConfigEntry[Long] =
     buildConf("celeborn.shuffle.io.maxChunksBeingTransferred")
       .categories("network")
       .doc("The max number of chunks allowed to be transferred at the same 
time on shuffle service. Note " +
@@ -1427,7 +1427,7 @@ object CelebornConf extends Logging {
         "`celeborn.<module>.io.retryWait`), if those limits are reached the 
task will fail with fetch failure.")
       .version("0.2.0")
       .longConf
-      .createWithDefault(Long.MaxValue)
+      .createOptional
 
   val PUSH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
     buildConf("celeborn.<module>.push.timeoutCheck.interval")
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 52b39b483..e1001ee9b 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -49,5 +49,5 @@ license: |
 | celeborn.rpc.dispatcher.threads | &lt;undefined&gt; | Threads number of 
message dispatcher event loop | 0.3.0 | 
 | celeborn.rpc.io.threads | &lt;undefined&gt; | Netty IO thread number of 
NettyRpcEnv to handle RPC request. The default threads number is the number of 
runtime available processors. | 0.2.0 | 
 | celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 
0.2.0 | 
-| celeborn.shuffle.io.maxChunksBeingTransferred | 9223372036854775807 | The 
max number of chunks allowed to be transferred at the same time on shuffle 
service. Note that new incoming connections will be closed when the max number 
is hit. The client will retry according to the shuffle retry configs (see 
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if 
those limits are reached the task will fail with fetch failure. | 0.2.0 | 
+| celeborn.shuffle.io.maxChunksBeingTransferred | &lt;undefined&gt; | The max 
number of chunks allowed to be transferred at the same time on shuffle service. 
Note that new incoming connections will be closed when the max number is hit. 
The client will retry according to the shuffle retry configs (see 
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if 
those limits are reached the task will fail with fetch failure. | 0.2.0 | 
 <!--end-include-->
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 950b5208a..9b4d08f8c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -27,6 +27,7 @@ import com.google.common.base.Throwables
 import io.netty.util.concurrent.{Future, GenericFutureListener}
 
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{FileInfo, FileManagedBuffers}
 import org.apache.celeborn.common.network.buffer.NioManagedBuffer
@@ -36,12 +37,15 @@ import 
org.apache.celeborn.common.network.protocol.Message.Type
 import org.apache.celeborn.common.network.server.BaseMessageHandler
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.PartitionType
-import org.apache.celeborn.common.util.ExceptionUtils
+import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
 import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, 
CreditStreamManager, PartitionFilesSorter, StorageManager}
 
 class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
   extends BaseMessageHandler with Logging {
-  var chunkStreamManager = new ChunkStreamManager()
+
+  val chunkStreamManager = new ChunkStreamManager()
+  val maxChunkBeingTransferred: Option[Long] = 
conf.shuffleIoMaxChunksBeingTransferred
+
   val creditStreamManager = new CreditStreamManager(
     conf.partitionReadBuffersMin,
     conf.partitionReadBuffersMax,
@@ -214,46 +218,49 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
     logTrace(s"Received req from 
${NettyUtils.getRemoteAddress(client.getChannel)}" +
       s" to fetch block ${req.streamChunkSlice}")
 
-    val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred
-    if (chunksBeingTransferred > conf.shuffleIoMaxChunksBeingTransferred) {
-      val message = "Worker is too busy. The number of chunks being 
transferred " +
-        s"$chunksBeingTransferred exceeds 
celeborn.shuffle.maxChunksBeingTransferred " +
-        s"${conf.shuffleIoMaxChunksBeingTransferred}."
-      logError(message)
-      client.getChannel.writeAndFlush(new 
ChunkFetchFailure(req.streamChunkSlice, message))
-    } else {
-      workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
-      val fetchTimeMetric = 
chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId)
-      val fetchBeginTime = System.nanoTime()
-      try {
-        val buf = chunkStreamManager.getChunk(
-          req.streamChunkSlice.streamId,
-          req.streamChunkSlice.chunkIndex,
-          req.streamChunkSlice.offset,
-          req.streamChunkSlice.len)
-        chunkStreamManager.chunkBeingSent(req.streamChunkSlice.streamId)
-        client.getChannel.writeAndFlush(new 
ChunkFetchSuccess(req.streamChunkSlice, buf))
-          .addListener(new GenericFutureListener[Future[_ >: Void]] {
-            override def operationComplete(future: Future[_ >: Void]): Unit = {
-              chunkStreamManager.chunkSent(req.streamChunkSlice.streamId)
-              if (fetchTimeMetric != null) {
-                fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
-              }
-              workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, 
req.toString)
-            }
-          })
-      } catch {
-        case e: Exception =>
-          logError(
-            s"Error opening block ${req.streamChunkSlice} for request from " +
-              NettyUtils.getRemoteAddress(client.getChannel),
-            e)
-          client.getChannel.writeAndFlush(new ChunkFetchFailure(
-            req.streamChunkSlice,
-            Throwables.getStackTraceAsString(e)))
-          workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+    maxChunkBeingTransferred.foreach { threshold =>
+      val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred 
// take high cpu usage
+      if (chunksBeingTransferred > threshold) {
+        val message = "Worker is too busy. The number of chunks being 
transferred " +
+          s"$chunksBeingTransferred exceeds 
${MAX_CHUNKS_BEING_TRANSFERRED.key} " +
+          s"${Utils.bytesToString(threshold)}."
+        logError(message)
+        client.getChannel.writeAndFlush(new 
ChunkFetchFailure(req.streamChunkSlice, message))
+        return
       }
     }
+
+    workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+    val fetchTimeMetric = 
chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId)
+    val fetchBeginTime = System.nanoTime()
+    try {
+      val buf = chunkStreamManager.getChunk(
+        req.streamChunkSlice.streamId,
+        req.streamChunkSlice.chunkIndex,
+        req.streamChunkSlice.offset,
+        req.streamChunkSlice.len)
+      chunkStreamManager.chunkBeingSent(req.streamChunkSlice.streamId)
+      client.getChannel.writeAndFlush(new 
ChunkFetchSuccess(req.streamChunkSlice, buf))
+        .addListener(new GenericFutureListener[Future[_ >: Void]] {
+          override def operationComplete(future: Future[_ >: Void]): Unit = {
+            chunkStreamManager.chunkSent(req.streamChunkSlice.streamId)
+            if (fetchTimeMetric != null) {
+              fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
+            }
+            workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+          }
+        })
+    } catch {
+      case e: Exception =>
+        logError(
+          s"Error opening block ${req.streamChunkSlice} for request from " +
+            NettyUtils.getRemoteAddress(client.getChannel),
+          e)
+        client.getChannel.writeAndFlush(new ChunkFetchFailure(
+          req.streamChunkSlice,
+          Throwables.getStackTraceAsString(e)))
+        workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+    }
   }
 
   override def checkRegistered: Boolean = registered.get

Reply via email to