This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new b8e6fa7  [SPARK-32738][CORE][2.4] Should reduce the number of active 
threads if fatal error happens in `Inbox.process`
b8e6fa7 is described below

commit b8e6fa7d86697ae9967764220c8220fad0f9d669
Author: Zhenhua Wang <wzh_...@163.com>
AuthorDate: Fri Sep 18 18:50:09 2020 -0500

    [SPARK-32738][CORE][2.4] Should reduce the number of active threads if 
fatal error happens in `Inbox.process`
    
    This is a backport for 
[pr#29580](https://github.com/apache/spark/pull/29580) to branch 2.4.
    
    ### What changes were proposed in this pull request?
    
    Processing for `ThreadSafeRpcEndpoint` is controlled by  `numActiveThreads` 
in `Inbox`. Now if any fatal error happens during `Inbox.process`, 
`numActiveThreads` is not reduced. Then other threads can not process messages 
in that inbox, which causes the endpoint to "hang". For other type of 
endpoints, we also should keep  `numActiveThreads` correct.
    
    This problem is more serious in previous Spark 2.x versions since the 
driver, executor and block manager endpoints are all thread safe endpoints.
    
    To fix this, we should reduce the number of active threads if fatal error 
happens in `Inbox.process`.
    
    ### Why are the changes needed?
    
    `numActiveThreads` is not correct when fatal error happens and will cause 
the described problem.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add a new test.
    
    Closes #29764 from wzhfy/deal_with_fatal_error_2.4.
    
    Authored-by: Zhenhua Wang <wzh_...@163.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/rpc/netty/Inbox.scala     | 20 ++++++++++++++++++++
 .../org/apache/spark/rpc/netty/InboxSuite.scala      | 14 ++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
index 44d2622..f6ff056 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
@@ -202,6 +202,16 @@ private[netty] class Inbox(
    * Calls action closure, and calls the endpoint's onError function in the 
case of exceptions.
    */
   private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
+    def dealWithFatalError(fatal: Throwable): Unit = {
+      inbox.synchronized {
+        assert(numActiveThreads > 0, "The number of active threads should be 
positive.")
+        // Should reduce the number of active threads before throw the error.
+        numActiveThreads -= 1
+      }
+      logError(s"An error happened while processing message in the inbox for 
$endpointRef", fatal)
+      throw fatal
+    }
+
     try action catch {
       case NonFatal(e) =>
         try endpoint.onError(e) catch {
@@ -211,8 +221,18 @@ private[netty] class Inbox(
             } else {
               logError("Ignoring error", ee)
             }
+          case fatal: Throwable =>
+            dealWithFatalError(fatal)
         }
+      case fatal: Throwable =>
+        dealWithFatalError(fatal)
     }
   }
 
+  // exposed only for testing
+  def getNumActiveThreads: Int = {
+    inbox.synchronized {
+      inbox.numActiveThreads
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
index e553956..9edc985 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
@@ -147,4 +147,18 @@ class InboxSuite extends SparkFunSuite {
 
     endpoint.verifySingleOnNetworkErrorMessage(cause, remoteAddress)
   }
+
+  test("SPARK-32738: should reduce the number of active threads when fatal 
error happens") {
+    val endpoint = mock(classOf[TestRpcEndpoint])
+    when(endpoint.receive).thenThrow(new OutOfMemoryError())
+
+    val endpointRef = mock(classOf[NettyRpcEndpointRef])
+    val dispatcher = mock(classOf[Dispatcher])
+    val inbox = new Inbox(endpointRef, endpoint)
+    inbox.post(OneWayMessage(null, "hi"))
+    intercept[OutOfMemoryError] {
+      inbox.process(dispatcher)
+    }
+    assert(inbox.getNumActiveThreads == 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to