Repository: spark
Updated Branches:
  refs/heads/branch-1.6 175681914 -> d821fae0e


[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming

Move Py4jCallbackConnectionCleaner to Streaming because the callback server 
starts only in StreamingContext.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10621 from zsxwing/SPARK-12617-2.

(cherry picked from commit 1e6648d62fb82b708ea54c51cd23bfe4f542856e)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d821fae0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d821fae0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d821fae0

Branch: refs/heads/branch-1.6
Commit: d821fae0ecca6393d3632977797d72ba594d26a9
Parents: 1756819
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Jan 6 12:03:01 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Jan 6 12:03:10 2016 -0800

----------------------------------------------------------------------
 python/pyspark/context.py           | 61 -------------------------------
 python/pyspark/streaming/context.py | 63 ++++++++++++++++++++++++++++++++
 2 files changed, 63 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d821fae0/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5e4aeac..529d16b 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -54,64 +54,6 @@ DEFAULT_CONFIGS = {
 }
 
 
-class Py4jCallbackConnectionCleaner(object):
-
-    """
-    A cleaner to clean up callback connections that are not closed by Py4j. 
See SPARK-12617.
-    It will scan all callback connections every 30 seconds and close the dead 
connections.
-    """
-
-    def __init__(self, gateway):
-        self._gateway = gateway
-        self._stopped = False
-        self._timer = None
-        self._lock = RLock()
-
-    def start(self):
-        if self._stopped:
-            return
-
-        def clean_closed_connections():
-            from py4j.java_gateway import quiet_close, quiet_shutdown
-
-            callback_server = self._gateway._callback_server
-            with callback_server.lock:
-                try:
-                    closed_connections = []
-                    for connection in callback_server.connections:
-                        if not connection.isAlive():
-                            quiet_close(connection.input)
-                            quiet_shutdown(connection.socket)
-                            quiet_close(connection.socket)
-                            closed_connections.append(connection)
-
-                    for closed_connection in closed_connections:
-                        callback_server.connections.remove(closed_connection)
-                except Exception:
-                    import traceback
-                    traceback.print_exc()
-
-            self._start_timer(clean_closed_connections)
-
-        self._start_timer(clean_closed_connections)
-
-    def _start_timer(self, f):
-        from threading import Timer
-
-        with self._lock:
-            if not self._stopped:
-                self._timer = Timer(30.0, f)
-                self._timer.daemon = True
-                self._timer.start()
-
-    def stop(self):
-        with self._lock:
-            self._stopped = True
-            if self._timer:
-                self._timer.cancel()
-                self._timer = None
-
-
 class SparkContext(object):
 
     """
@@ -126,7 +68,6 @@ class SparkContext(object):
     _active_spark_context = None
     _lock = RLock()
     _python_includes = None  # zip and egg files that need to be added to 
PYTHONPATH
-    _py4j_cleaner = None
 
     PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
 
@@ -303,8 +244,6 @@ class SparkContext(object):
             if not SparkContext._gateway:
                 SparkContext._gateway = gateway or launch_gateway()
                 SparkContext._jvm = SparkContext._gateway.jvm
-                _py4j_cleaner = 
Py4jCallbackConnectionCleaner(SparkContext._gateway)
-                _py4j_cleaner.start()
 
             if instance:
                 if (SparkContext._active_spark_context and

http://git-wip-us.apache.org/repos/asf/spark/blob/d821fae0/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 06346e5..874cb3f 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -19,6 +19,7 @@ from __future__ import print_function
 
 import os
 import sys
+from threading import RLock, Timer
 
 from py4j.java_gateway import java_import, JavaObject
 
@@ -32,6 +33,63 @@ from pyspark.streaming.util import TransformFunction, 
TransformFunctionSerialize
 __all__ = ["StreamingContext"]
 
 
+class Py4jCallbackConnectionCleaner(object):
+
+    """
+    A cleaner to clean up callback connections that are not closed by Py4j. 
See SPARK-12617.
+    It will scan all callback connections every 30 seconds and close the dead 
connections.
+    """
+
+    def __init__(self, gateway):
+        self._gateway = gateway
+        self._stopped = False
+        self._timer = None
+        self._lock = RLock()
+
+    def start(self):
+        if self._stopped:
+            return
+
+        def clean_closed_connections():
+            from py4j.java_gateway import quiet_close, quiet_shutdown
+
+            callback_server = self._gateway._callback_server
+            if callback_server:
+                with callback_server.lock:
+                    try:
+                        closed_connections = []
+                        for connection in callback_server.connections:
+                            if not connection.isAlive():
+                                quiet_close(connection.input)
+                                quiet_shutdown(connection.socket)
+                                quiet_close(connection.socket)
+                                closed_connections.append(connection)
+
+                        for closed_connection in closed_connections:
+                            
callback_server.connections.remove(closed_connection)
+                    except Exception:
+                        import traceback
+                        traceback.print_exc()
+
+            self._start_timer(clean_closed_connections)
+
+        self._start_timer(clean_closed_connections)
+
+    def _start_timer(self, f):
+        with self._lock:
+            if not self._stopped:
+                self._timer = Timer(30.0, f)
+                self._timer.daemon = True
+                self._timer.start()
+
+    def stop(self):
+        with self._lock:
+            self._stopped = True
+            if self._timer:
+                self._timer.cancel()
+                self._timer = None
+
+
 class StreamingContext(object):
     """
     Main entry point for Spark Streaming functionality. A StreamingContext
@@ -47,6 +105,9 @@ class StreamingContext(object):
     # Reference to a currently active StreamingContext
     _activeContext = None
 
+    # A cleaner to clean leak sockets of callback server every 30 seconds
+    _py4j_cleaner = None
+
     def __init__(self, sparkContext, batchDuration=None, jssc=None):
         """
         Create a new StreamingContext.
@@ -95,6 +156,8 @@ class StreamingContext(object):
             jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
             # update the port of CallbackClient with real port
             gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, 
gw._python_proxy_port)
+            _py4j_cleaner = Py4jCallbackConnectionCleaner(gw)
+            _py4j_cleaner.start()
 
         # register serializer for TransformFunction
         # it happens before creating SparkContext when loading from 
checkpointing


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to