Author: xuefu
Date: Tue Dec  2 14:59:24 2014
New Revision: 1642894

URL: http://svn.apache.org/r1642894
Log:
HIVE-8995: Find thread leak in RSC Tests [Spark Branch] (Rui via Xuefu)

Modified:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
    
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
 Tue Dec  2 14:59:24 2014
@@ -54,7 +54,7 @@ public class HiveSparkClientFactory {
     }
   }
 
-  private static Map<String, String> initiateSparkConf(Configuration hiveConf) 
{
+  public static Map<String, String> initiateSparkConf(Configuration hiveConf) {
     Map<String, String> sparkConf = new HashMap<String, String>();
 
     // set default spark configurations.

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 Tue Dec  2 14:59:24 2014
@@ -76,7 +76,6 @@ public class RemoteHiveSparkClient imple
   private transient List<String> localFiles = new ArrayList<String>();
 
   RemoteHiveSparkClient(Map<String, String> conf) throws IOException, 
SparkException {
-    SparkClientFactory.initialize(conf);
     sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
     remoteClient = SparkClientFactory.createClient(conf);
   }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
 Tue Dec  2 14:59:24 2014
@@ -21,15 +21,19 @@ import com.google.common.base.Preconditi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.spark.client.SparkClientFactory;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple implementation of <i>SparkSessionManager</i>
@@ -41,7 +45,7 @@ public class SparkSessionManagerImpl imp
   private static final Log LOG = 
LogFactory.getLog(SparkSessionManagerImpl.class);
 
   private Set<SparkSession> createdSessions;
-  private boolean inited;
+  private AtomicBoolean inited = new AtomicBoolean(false);
 
   private static SparkSessionManagerImpl instance;
 
@@ -74,13 +78,16 @@ public class SparkSessionManagerImpl imp
 
   @Override
   public void setup(HiveConf hiveConf) throws HiveException {
-    LOG.info("Setting up the session manager.");
-    init();
-  }
-
-  private void init() {
-    createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
-    inited = true;
+    if (inited.compareAndSet(false, true)) {
+      LOG.info("Setting up the session manager.");
+      createdSessions = Collections.synchronizedSet(new 
HashSet<SparkSession>());
+      Map<String, String> conf = 
HiveSparkClientFactory.initiateSparkConf(hiveConf);
+      try {
+        SparkClientFactory.initialize(conf);
+      } catch (IOException e) {
+        throw new HiveException("Error initializing SparkClientFactory", e);
+      }
+    }
   }
 
   /**
@@ -92,9 +99,7 @@ public class SparkSessionManagerImpl imp
   @Override
   public SparkSession getSession(SparkSession existingSession, HiveConf conf,
       boolean doOpen) throws HiveException {
-    if (!inited) {
-      init();
-    }
+    setup(conf);
 
     if (existingSession != null) {
       if (canReuseSession(existingSession, conf)) {
@@ -178,6 +183,7 @@ public class SparkSessionManagerImpl imp
         createdSessions.clear();
       }
     }
-    inited = false;
+    inited.set(false);
+    SparkClientFactory.stop();
   }
 }

Modified: 
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
--- 
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
 (original)
+++ 
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
 Tue Dec  2 14:59:24 2014
@@ -44,15 +44,17 @@ public final class SparkClientFactory {
    * @param conf Map containing configuration parameters for the client.
    */
   public static synchronized void initialize(Map<String, String> conf) throws 
IOException {
-    secret = akka.util.Crypt.generateSecureCookie();
+    if (!initialized) {
+      secret = akka.util.Crypt.generateSecureCookie();
 
-    Map<String, String> akkaConf = Maps.newHashMap(conf);
-    akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret);
+      Map<String, String> akkaConf = Maps.newHashMap(conf);
+      akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret);
 
-    ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(akkaConf);
-    actorSystem = info.system;
-    akkaUrl = info.url;
-    initialized = true;
+      ClientUtils.ActorSystemInfo info = 
ClientUtils.createActorSystem(akkaConf);
+      actorSystem = info.system;
+      akkaUrl = info.url;
+      initialized = true;
+    }
   }
 
   /** Stops the SparkClient library. */


Reply via email to