Author: xuefu
Date: Tue Dec 2 14:59:24 2014
New Revision: 1642894
URL: http://svn.apache.org/r1642894
Log:
HIVE-8995: Find thread leak in RSC Tests [Spark Branch] (Rui via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
Tue Dec 2 14:59:24 2014
@@ -54,7 +54,7 @@ public class HiveSparkClientFactory {
}
}
- private static Map<String, String> initiateSparkConf(Configuration hiveConf)
{
+ public static Map<String, String> initiateSparkConf(Configuration hiveConf) {
Map<String, String> sparkConf = new HashMap<String, String>();
// set default spark configurations.
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
Tue Dec 2 14:59:24 2014
@@ -76,7 +76,6 @@ public class RemoteHiveSparkClient imple
private transient List<String> localFiles = new ArrayList<String>();
RemoteHiveSparkClient(Map<String, String> conf) throws IOException,
SparkException {
- SparkClientFactory.initialize(conf);
sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
remoteClient = SparkClientFactory.createClient(conf);
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
Tue Dec 2 14:59:24 2014
@@ -21,15 +21,19 @@ import com.google.common.base.Preconditi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.spark.client.SparkClientFactory;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Simple implementation of <i>SparkSessionManager</i>
@@ -41,7 +45,7 @@ public class SparkSessionManagerImpl imp
private static final Log LOG =
LogFactory.getLog(SparkSessionManagerImpl.class);
private Set<SparkSession> createdSessions;
- private boolean inited;
+ private AtomicBoolean inited = new AtomicBoolean(false);
private static SparkSessionManagerImpl instance;
@@ -74,13 +78,16 @@ public class SparkSessionManagerImpl imp
@Override
public void setup(HiveConf hiveConf) throws HiveException {
- LOG.info("Setting up the session manager.");
- init();
- }
-
- private void init() {
- createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
- inited = true;
+ if (inited.compareAndSet(false, true)) {
+ LOG.info("Setting up the session manager.");
+ createdSessions = Collections.synchronizedSet(new
HashSet<SparkSession>());
+ Map<String, String> conf =
HiveSparkClientFactory.initiateSparkConf(hiveConf);
+ try {
+ SparkClientFactory.initialize(conf);
+ } catch (IOException e) {
+ throw new HiveException("Error initializing SparkClientFactory", e);
+ }
+ }
}
/**
@@ -92,9 +99,7 @@ public class SparkSessionManagerImpl imp
@Override
public SparkSession getSession(SparkSession existingSession, HiveConf conf,
boolean doOpen) throws HiveException {
- if (!inited) {
- init();
- }
+ setup(conf);
if (existingSession != null) {
if (canReuseSession(existingSession, conf)) {
@@ -178,6 +183,7 @@ public class SparkSessionManagerImpl imp
createdSessions.clear();
}
}
- inited = false;
+ inited.set(false);
+ SparkClientFactory.stop();
}
}
Modified:
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1642894&r1=1642893&r2=1642894&view=diff
==============================================================================
---
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
(original)
+++
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
Tue Dec 2 14:59:24 2014
@@ -44,15 +44,17 @@ public final class SparkClientFactory {
* @param conf Map containing configuration parameters for the client.
*/
public static synchronized void initialize(Map<String, String> conf) throws
IOException {
- secret = akka.util.Crypt.generateSecureCookie();
+ if (!initialized) {
+ secret = akka.util.Crypt.generateSecureCookie();
- Map<String, String> akkaConf = Maps.newHashMap(conf);
- akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret);
+ Map<String, String> akkaConf = Maps.newHashMap(conf);
+ akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret);
- ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(akkaConf);
- actorSystem = info.system;
- akkaUrl = info.url;
- initialized = true;
+ ClientUtils.ActorSystemInfo info =
ClientUtils.createActorSystem(akkaConf);
+ actorSystem = info.system;
+ akkaUrl = info.url;
+ initialized = true;
+ }
}
/** Stops the SparkClient library. */