Author: brock
Date: Mon Aug 18 18:47:10 2014
New Revision: 1618702
URL: http://svn.apache.org/r1618702
Log:
HIVE-7606 - Design SparkSession, SparkSessionManager [Spark Branch] (Venki
Korukanti via Brock)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1618702&r1=1618701&r2=1618702&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
Mon Aug 18 18:47:10 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
@@ -28,10 +29,15 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
@@ -50,18 +56,28 @@ public class SparkTask extends Task<Spar
public int execute(DriverContext driverContext) {
int rc = 1;
- SparkClient client = null;
+ SparkSession sparkSession = null;
+ SparkSessionManager sparkSessionManager = null;
try {
configureNumberOfReducers();
- client = SparkClient.getInstance(driverContext.getCtx().getConf());
- rc = client.execute(driverContext, getWork());
+ sparkSessionManager = SparkSessionManagerImpl.getInstance();
+ sparkSession = SessionState.get().getSparkSession();
+ sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
+ SessionState.get().setSparkSession(sparkSession);
+
+ rc = sparkSession.submit(driverContext, getWork());
} catch (Exception e) {
LOG.error("Failed to execute spark task.", e);
return 1;
}
finally {
- if (client != null) {
+ if (sparkSession != null && sparkSessionManager != null) {
rc = close(rc);
+ try {
+ sparkSessionManager.returnSession(sparkSession);
+ } catch(HiveException ex) {
+ LOG.error("Failed to return the session to SessionManager", ex);
+ }
}
}
return rc;
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java?rev=1618702&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
Mon Aug 18 18:47:10 2014
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+public interface SparkSession {
+ /**
+ * Initializes a Spark session for DAG execution.
+ */
+ public void open(HiveConf conf);
+
+ /**
+ * Submit given <i>sparkWork</i> to SparkClient
+ * @param driverContext
+ * @param sparkWork
+ */
+ public int submit(DriverContext driverContext, SparkWork sparkWork);
+
+ /**
+ * Is the session open and ready to submit jobs?
+ */
+ public boolean isOpen();
+
+ /**
+ * Return configuration.
+ */
+ public HiveConf getConf();
+
+ /**
+ * Return session id.
+ */
+ public String getSessionId();
+
+ /**
+ * Close session and release resources
+ */
+ public void close();
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1618702&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
Mon Aug 18 18:47:10 2014
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+import java.util.UUID;
+
+/**
+ * Simple implementation of <i>SparkSession</i> which currently just submits
jobs to
+ * SparkClient which is shared by all SparkSession instances.
+ */
+public class SparkSessionImpl implements SparkSession {
+ private HiveConf conf;
+ private boolean isOpen;
+ private final String sessionId;
+
+ public SparkSessionImpl() {
+ sessionId = makeSessionId();
+ }
+
+ @Override
+ public void open(HiveConf conf) {
+ this.conf = conf;
+ isOpen = true;
+ }
+
+ @Override
+ public int submit(DriverContext driverContext, SparkWork sparkWork) {
+ Preconditions.checkState(isOpen, "Session is not open. Can't submit
jobs.");
+ return SparkClient.getInstance(driverContext.getCtx().getConf())
+ .execute(driverContext, sparkWork);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ @Override
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ @Override
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public void close() {
+ isOpen = false;
+ }
+
+ public static String makeSessionId() {
+ return UUID.randomUUID().toString();
+ }
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java?rev=1618702&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
Mon Aug 18 18:47:10 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * Defines interface for managing multiple SparkSessions in Hive when multiple
users
+ * are executing queries simultaneously on Spark execution engine.
+ */
+public interface SparkSessionManager {
+ /**
+ * Initialize based on given configuration.
+ *
+ * @param hiveConf
+ */
+ public void setup(HiveConf hiveConf) throws HiveException;
+
+ /**
+ * Get a valid SparkSession. First try to check if existing session is
reusable
+ * based on the given <i>conf</i>. If not release <i>existingSession</i> and
return
+ * a new session based on session manager criteria and <i>conf</i>.
+ *
+ * @param existingSession Existing session (can be null)
+ * @param conf
+ * @param doOpen Should the session be opened before returning?
+ * @return
+ */
+ public SparkSession getSession(SparkSession existingSession, HiveConf conf,
+ boolean doOpen) throws HiveException;
+
+ /**
+ * Return the given <i>sparkSession</i> to pool. This is used when the client
+ * still holds references to session and may want to reuse it in future.
+ * When client wants to reuse the session, it should pass the it
<i>getSession</i> method.
+ */
+ public void returnSession(SparkSession sparkSession) throws HiveException;
+
+ /**
+ * Close the given session and return it to pool. This is used when the
client
+ * no longer needs a SparkSession.
+ */
+ public void closeSession(SparkSession sparkSession) throws HiveException;
+
+ /**
+ * Shutdown the session manager. Also closing up SparkSessions in pool.
+ */
+ public void shutdown();
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java?rev=1618702&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
Mon Aug 18 18:47:10 2014
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Simple implementation of <i>SparkSessionManager</i>
+ * - returns SparkSession when requested through <i>getSession</i> and keeps
track of
+ * created sessions. Currently no limit on the number sessions.
+ * - SparkSession is reused if the userName in new conf and user name in
session conf match.
+ */
+public class SparkSessionManagerImpl implements SparkSessionManager {
+ private static final Log LOG =
LogFactory.getLog(SparkSessionManagerImpl.class);
+
+ private Set<SparkSession> createdSessions;
+ private boolean inited;
+
+ private static SparkSessionManagerImpl instance;
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ if (instance != null) {
+ instance.shutdown();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ }
+
+ public synchronized static SparkSessionManagerImpl getInstance()
+ throws HiveException {
+ if (instance == null) {
+ instance = new SparkSessionManagerImpl();
+ }
+
+ return instance;
+ }
+
+ private SparkSessionManagerImpl() {
+ }
+
+ @Override
+ public void setup(HiveConf hiveConf) throws HiveException {
+ LOG.info("Setting up the session manager.");
+ init();
+ }
+
+ private void init() {
+ createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
+ inited = true;
+ }
+
+ /**
+ * If the <i>existingSession</i> can be reused return it.
+ * Otherwise
+ * - close it and remove it from the list.
+ * - create a new session and add it to the list.
+ */
+ @Override
+ public SparkSession getSession(SparkSession existingSession, HiveConf conf,
+ boolean doOpen) throws HiveException {
+ if (!inited) {
+ init();
+ }
+
+ if (existingSession != null) {
+ if (canReuseSession(existingSession, conf)) {
+ // Open the session if it is closed.
+ if (!existingSession.isOpen() && doOpen) {
+ existingSession.open(conf);
+ }
+
+ Preconditions.checkState(createdSessions.contains(existingSession));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Existing session (%s) is reused.",
+ existingSession.getSessionId()));
+ }
+ return existingSession;
+ } else {
+ // Close the session, as the client is holding onto a session that
can't be used
+ // by the client anymore.
+ closeSession(existingSession);
+ }
+ }
+
+ SparkSession sparkSession = new SparkSessionImpl();
+ createdSessions.add(sparkSession);
+
+ if (doOpen) {
+ sparkSession.open(conf);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("New session (%s) is created.",
sparkSession.getSessionId()));
+ }
+ return sparkSession;
+ }
+
+ /**
+ * Currently we only match the userNames in existingSession conf and given
conf.
+ */
+ private boolean canReuseSession(SparkSession existingSession, HiveConf conf)
throws HiveException {
+ try {
+ UserGroupInformation newUgi =
ShimLoader.getHadoopShims().getUGIForConf(conf);
+ String newUserName =
ShimLoader.getHadoopShims().getShortUserName(newUgi);
+
+ UserGroupInformation ugiInSession =
+ ShimLoader.getHadoopShims().getUGIForConf(existingSession.getConf());
+ String userNameInSession =
ShimLoader.getHadoopShims().getShortUserName(ugiInSession);
+
+ return newUserName.equals(userNameInSession);
+ } catch(Exception ex) {
+ throw new HiveException("Failed to get user info from HiveConf.", ex);
+ }
+ }
+
+ @Override
+ public void returnSession(SparkSession sparkSession) throws HiveException {
+ // In this particular SparkSessionManager implementation, we don't recycle
+ // returned sessions. References to session are still valid.
+ }
+
+ @Override
+ public void closeSession(SparkSession sparkSession) throws HiveException {
+ if (sparkSession == null) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Closing session (%s).",
sparkSession.getSessionId()));
+ }
+ sparkSession.close();
+ createdSessions.remove(sparkSession);
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Closing the session manager.");
+ if (createdSessions != null) {
+ synchronized(createdSessions) {
+ Iterator<SparkSession> it = createdSessions.iterator();
+ while (it.hasNext()) {
+ SparkSession session = it.next();
+ session.close();
+ }
+ createdSessions.clear();
+ }
+ }
+ inited = false;
+ }
+}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1618702&r1=1618701&r2=1618702&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
Mon Aug 18 18:47:10 2014
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.history.HiveHistory;
@@ -177,6 +179,8 @@ public class SessionState {
private String userIpAddress;
+ private SparkSession sparkSession;
+
/**
* Lineage state.
*/
@@ -1070,6 +1074,16 @@ public class SessionState {
tezSessionState = null;
}
+ if (sparkSession != null) {
+ try {
+ SparkSessionManagerImpl.getInstance().closeSession(sparkSession);
+ } catch(Exception ex) {
+ LOG.error("Error closing spark session.", ex);
+ } finally {
+ sparkSession = null;
+ }
+ }
+
dropSessionPaths(conf);
}
@@ -1160,4 +1174,12 @@ public class SessionState {
this.userIpAddress = userIpAddress;
}
+
+ public SparkSession getSparkSession() {
+ return sparkSession;
+ }
+
+ public void setSparkSession(SparkSession sparkSession) {
+ this.sparkSession = sparkSession;
+ }
}
Added:
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java?rev=1618702&view=auto
==============================================================================
---
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
(added)
+++
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
Mon Aug 18 18:47:10 2014
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestSparkSessionManagerImpl {
+ private static final Log LOG =
LogFactory.getLog(TestSparkSessionManagerImpl.class);
+
+ private SparkSessionManagerImpl sessionManagerHS2 = null;
+ private boolean anyFailedSessionThread; // updated only when a thread has
failed.
+
+
+ /** Tests CLI scenario where we get a single session and use it multiple
times. */
+ @Test
+ public void testSingleSessionMultipleUse() throws Exception {
+ HiveConf conf = new HiveConf();
+
+ SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance();
+ SparkSession sparkSession1 = sessionManager.getSession(null, conf, true);
+
+ assertTrue(sparkSession1.isOpen());
+
+ SparkSession sparkSession2 = sessionManager.getSession(sparkSession1,
conf, true);
+ assertTrue(sparkSession1 == sparkSession2); // Same session object is
expected.
+
+ assertTrue(sparkSession2.isOpen());
+ sessionManager.shutdown();
+ sessionManager.closeSession(sparkSession1);
+ }
+
+ /**
+ * Tests multi-user scenario (like HiveServer2) where each user gets a
session
+ * and uses it multiple times.
+ */
+ @Test
+ public void testMultiSessionMultipleUse() throws Exception {
+ sessionManagerHS2 = SparkSessionManagerImpl.getInstance();
+
+ // Shutdown existing session manager
+ sessionManagerHS2.shutdown();
+
+ HiveConf hiveConf = new HiveConf();
+ sessionManagerHS2.setup(hiveConf);
+
+ List<Thread> threadList = new ArrayList<Thread>();
+ for (int i = 0; i < 10; i++) {
+ Thread t = new Thread(new SessionThread(), "Session thread " + i);
+ t.start();
+ threadList.add(t);
+ }
+
+ for (Thread t : threadList) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ String msg = "Interrupted while waiting for test session threads.";
+ LOG.error(msg, e);
+ fail(msg);
+ }
+ }
+
+ assertFalse("At least one of the session threads failed. See the test
output for details.",
+ anyFailedSessionThread);
+
+ System.out.println("Ending SessionManagerHS2");
+ sessionManagerHS2.shutdown();
+ }
+
+ /* Thread simulating a user session in HiveServer2. */
+ public class SessionThread implements Runnable {
+
+
+ @Override
+ public void run() {
+ try {
+ Random random = new Random(Thread.currentThread().getId());
+ String threadName = Thread.currentThread().getName();
+ System.out.println(threadName + " started.");
+ HiveConf conf = new HiveConf();
+ SparkSession prevSession = null;
+ SparkSession currentSession = null;
+
+ for(int i = 0; i < 5; i++) {
+ currentSession = sessionManagerHS2.getSession(prevSession, conf,
true);
+ assertTrue(prevSession == null || prevSession == currentSession);
+ assertTrue(currentSession.isOpen());
+ System.out.println(String.format("%s got session (%d): %s",
+ threadName, i, currentSession.getSessionId()));
+ Thread.sleep((random.nextInt(3)+1) * 1000);
+
+ sessionManagerHS2.returnSession(currentSession);
+ prevSession = currentSession;
+ }
+ sessionManagerHS2.closeSession(currentSession);
+ System.out.println(threadName + " ended.");
+ } catch (Throwable e) {
+ anyFailedSessionThread = true;
+ String msg = String.format("Error executing '%s'",
Thread.currentThread().getName());
+ LOG.error(msg, e);
+ fail(msg + " " + StringUtils.stringifyException(e));
+ }
+ }
+ }
+}
Modified:
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1618702&r1=1618701&r2=1618702&view=diff
==============================================================================
---
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
(original)
+++
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
Mon Aug 18 18:47:10 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.Log
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.service.CompositeService;
@@ -87,6 +88,14 @@ public class HiveServer2 extends Composi
e.printStackTrace();
}
}
+
+ if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ try {
+ SparkSessionManagerImpl.getInstance().shutdown();
+ } catch(Exception ex) {
+ LOG.error("Spark session pool manager failed to stop during
HiveServer2 shutdown.", ex);
+ }
+ }
}
private static void startHiveServer2() throws Throwable {
@@ -104,6 +113,10 @@ public class HiveServer2 extends Composi
sessionPool.setupPool(hiveConf);
sessionPool.startPool();
}
+
+ if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ SparkSessionManagerImpl.getInstance().setup(hiveConf);
+ }
break;
} catch (Throwable throwable) {
if(++attempts >= maxAttempts) {