Author: brock
Date: Mon Aug 18 18:47:10 2014
New Revision: 1618702

URL: http://svn.apache.org/r1618702
Log:
HIVE-7606 - Design SparkSession, SparkSessionManager [Spark Branch] (Venki 
Korukanti via Brock)

Added:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
    
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/
    
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
Modified:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1618702&r1=1618701&r2=1618702&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 Mon Aug 18 18:47:10 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -28,10 +29,15 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
@@ -50,18 +56,28 @@ public class SparkTask extends Task<Spar
   public int execute(DriverContext driverContext) {
 
     int rc = 1;
-    SparkClient client = null;
+    SparkSession sparkSession = null;
+    SparkSessionManager sparkSessionManager = null;
     try {
       configureNumberOfReducers();
-      client = SparkClient.getInstance(driverContext.getCtx().getConf());
-      rc = client.execute(driverContext, getWork());
+      sparkSessionManager = SparkSessionManagerImpl.getInstance();
+      sparkSession = SessionState.get().getSparkSession();
+      sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
+      SessionState.get().setSparkSession(sparkSession);
+
+      rc = sparkSession.submit(driverContext, getWork());
     } catch (Exception e) {
       LOG.error("Failed to execute spark task.", e);
       return 1;
     }
     finally {
-      if (client != null) {
+      if (sparkSession != null && sparkSessionManager != null) {
         rc = close(rc);
+        try {
+          sparkSessionManager.returnSession(sparkSession);
+        } catch(HiveException ex) {
+          LOG.error("Failed to return the session to SessionManager", ex);
+        }
       }
     }
     return rc;

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java?rev=1618702&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
 Mon Aug 18 18:47:10 2014
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+public interface SparkSession {
+  /**
+   * Initializes a Spark session for DAG execution.
+   */
+  public void open(HiveConf conf);
+
+  /**
+   * Submit given <i>sparkWork</i> to SparkClient
+   * @param driverContext
+   * @param sparkWork
+   */
+  public int submit(DriverContext driverContext, SparkWork sparkWork);
+
+  /**
+   * Is the session open and ready to submit jobs?
+   */
+  public boolean isOpen();
+
+  /**
+   * Return configuration.
+   */
+  public HiveConf getConf();
+
+  /**
+   * Return session id.
+   */
+  public String getSessionId();
+
+  /**
+   * Close session and release resources
+   */
+  public void close();
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1618702&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
 Mon Aug 18 18:47:10 2014
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+import java.util.UUID;
+
+/**
+ * Simple implementation of <i>SparkSession</i> which currently just submits 
jobs to
+ * SparkClient which is shared by all SparkSession instances.
+ */
+public class SparkSessionImpl implements SparkSession {
+  private HiveConf conf;
+  private boolean isOpen;
+  private final String sessionId;
+
+  public SparkSessionImpl() {
+    sessionId = makeSessionId();
+  }
+
+  @Override
+  public void open(HiveConf conf) {
+    this.conf = conf;
+    isOpen = true;
+  }
+
+  @Override
+  public int submit(DriverContext driverContext, SparkWork sparkWork) {
+    Preconditions.checkState(isOpen, "Session is not open. Can't submit 
jobs.");
+    return SparkClient.getInstance(driverContext.getCtx().getConf())
+        .execute(driverContext, sparkWork);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return isOpen;
+  }
+
+  @Override
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  @Override
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  @Override
+  public void close() {
+    isOpen = false;
+  }
+
+  public static String makeSessionId() {
+    return UUID.randomUUID().toString();
+  }
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java?rev=1618702&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
 Mon Aug 18 18:47:10 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * Defines interface for managing multiple SparkSessions in Hive when multiple 
users
+ * are executing queries simultaneously on Spark execution engine.
+ */
+public interface SparkSessionManager {
+  /**
+   * Initialize based on given configuration.
+   *
+   * @param hiveConf
+   */
+  public void setup(HiveConf hiveConf) throws HiveException;
+
+  /**
+   * Get a valid SparkSession. First try to check if existing session is 
reusable
+   * based on the given <i>conf</i>. If not release <i>existingSession</i> and 
return
+   * a new session based on session manager criteria and <i>conf</i>.
+   *
+   * @param existingSession Existing session (can be null)
+   * @param conf
+   * @param doOpen Should the session be opened before returning?
+   * @return
+   */
+  public SparkSession getSession(SparkSession existingSession, HiveConf conf,
+      boolean doOpen) throws HiveException;
+
+  /**
+   * Return the given <i>sparkSession</i> to pool. This is used when the client
+   * still holds references to session and may want to reuse it in future.
+   * When client wants to reuse the session, it should pass the it 
<i>getSession</i> method.
+   */
+  public void returnSession(SparkSession sparkSession) throws HiveException;
+
+  /**
+   * Close the given session and return it to pool. This is used when the 
client
+   * no longer needs a SparkSession.
+   */
+  public void closeSession(SparkSession sparkSession) throws HiveException;
+
+  /**
+   * Shutdown the session manager. Also closing up SparkSessions in pool.
+   */
+  public void shutdown();
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java?rev=1618702&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
 Mon Aug 18 18:47:10 2014
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Simple implementation of <i>SparkSessionManager</i>
+ *   - returns SparkSession when requested through <i>getSession</i> and keeps 
track of
+ *       created sessions. Currently no limit on the number sessions.
+ *   - SparkSession is reused if the userName in new conf and user name in 
session conf match.
+ */
+public class SparkSessionManagerImpl implements SparkSessionManager {
+  private static final Log LOG = 
LogFactory.getLog(SparkSessionManagerImpl.class);
+
+  private Set<SparkSession> createdSessions;
+  private boolean inited;
+
+  private static SparkSessionManagerImpl instance;
+
+  static {
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        try {
+          if (instance != null) {
+            instance.shutdown();
+          }
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    });
+  }
+
+  public synchronized static SparkSessionManagerImpl getInstance()
+      throws HiveException {
+    if (instance == null) {
+      instance = new SparkSessionManagerImpl();
+    }
+
+    return instance;
+  }
+
+  private SparkSessionManagerImpl() {
+  }
+
+  @Override
+  public void setup(HiveConf hiveConf) throws HiveException {
+    LOG.info("Setting up the session manager.");
+    init();
+  }
+
+  private void init() {
+    createdSessions = Collections.synchronizedSet(new HashSet<SparkSession>());
+    inited = true;
+  }
+
+  /**
+   * If the <i>existingSession</i> can be reused return it.
+   * Otherwise
+   *   - close it and remove it from the list.
+   *   - create a new session and add it to the list.
+   */
+  @Override
+  public SparkSession getSession(SparkSession existingSession, HiveConf conf,
+      boolean doOpen) throws HiveException {
+    if (!inited) {
+      init();
+    }
+
+    if (existingSession != null) {
+      if (canReuseSession(existingSession, conf)) {
+        // Open the session if it is closed.
+        if (!existingSession.isOpen() && doOpen) {
+          existingSession.open(conf);
+        }
+
+        Preconditions.checkState(createdSessions.contains(existingSession));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Existing session (%s) is reused.",
+              existingSession.getSessionId()));
+        }
+        return existingSession;
+      } else {
+        // Close the session, as the client is holding onto a session that 
can't be used
+        // by the client anymore.
+        closeSession(existingSession);
+      }
+    }
+
+    SparkSession sparkSession = new SparkSessionImpl();
+    createdSessions.add(sparkSession);
+
+    if (doOpen) {
+      sparkSession.open(conf);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("New session (%s) is created.", 
sparkSession.getSessionId()));
+    }
+    return sparkSession;
+  }
+
+  /**
+   * Currently we only match the userNames in existingSession conf and given 
conf.
+   */
+  private boolean canReuseSession(SparkSession existingSession, HiveConf conf) 
throws HiveException {
+    try {
+      UserGroupInformation newUgi = 
ShimLoader.getHadoopShims().getUGIForConf(conf);
+      String newUserName = 
ShimLoader.getHadoopShims().getShortUserName(newUgi);
+
+      UserGroupInformation ugiInSession =
+          ShimLoader.getHadoopShims().getUGIForConf(existingSession.getConf());
+      String userNameInSession = 
ShimLoader.getHadoopShims().getShortUserName(ugiInSession);
+
+      return newUserName.equals(userNameInSession);
+    } catch(Exception ex) {
+      throw new HiveException("Failed to get user info from HiveConf.", ex);
+    }
+  }
+
+  @Override
+  public void returnSession(SparkSession sparkSession) throws HiveException {
+    // In this particular SparkSessionManager implementation, we don't recycle
+    // returned sessions. References to session are still valid.
+  }
+
+  @Override
+  public void closeSession(SparkSession sparkSession) throws HiveException {
+    if (sparkSession == null) {
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Closing session (%s).", 
sparkSession.getSessionId()));
+    }
+    sparkSession.close();
+    createdSessions.remove(sparkSession);
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Closing the session manager.");
+    if (createdSessions != null) {
+      synchronized(createdSessions) {
+        Iterator<SparkSession> it = createdSessions.iterator();
+        while (it.hasNext()) {
+          SparkSession session = it.next();
+          session.close();
+        }
+        createdSessions.clear();
+      }
+    }
+    inited = false;
+  }
+}

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1618702&r1=1618701&r2=1618702&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
 Mon Aug 18 18:47:10 2014
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
@@ -177,6 +179,8 @@ public class SessionState {
 
   private String userIpAddress;
 
+  private SparkSession sparkSession;
+
   /**
    * Lineage state.
    */
@@ -1070,6 +1074,16 @@ public class SessionState {
       tezSessionState = null;
     }
 
+    if (sparkSession != null) {
+      try {
+        SparkSessionManagerImpl.getInstance().closeSession(sparkSession);
+      } catch(Exception ex) {
+        LOG.error("Error closing spark session.", ex);
+      } finally {
+        sparkSession = null;
+      }
+    }
+
     dropSessionPaths(conf);
   }
 
@@ -1160,4 +1174,12 @@ public class SessionState {
     this.userIpAddress = userIpAddress;
   }
 
+
+  public SparkSession getSparkSession() {
+    return sparkSession;
+  }
+
+  public void setSparkSession(SparkSession sparkSession) {
+    this.sparkSession = sparkSession;
+  }
 }

Added: 
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java?rev=1618702&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
 (added)
+++ 
hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
 Mon Aug 18 18:47:10 2014
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestSparkSessionManagerImpl {
+  private static final Log LOG = 
LogFactory.getLog(TestSparkSessionManagerImpl.class);
+
+  private SparkSessionManagerImpl sessionManagerHS2 = null;
+  private boolean anyFailedSessionThread; // updated only when a thread has 
failed.
+
+
+  /** Tests CLI scenario where we get a single session and use it multiple 
times. */
+  @Test
+  public void testSingleSessionMultipleUse() throws Exception {
+    HiveConf conf = new HiveConf();
+
+    SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance();
+    SparkSession sparkSession1 = sessionManager.getSession(null, conf, true);
+
+    assertTrue(sparkSession1.isOpen());
+
+    SparkSession sparkSession2 = sessionManager.getSession(sparkSession1, 
conf, true);
+    assertTrue(sparkSession1 == sparkSession2); // Same session object is 
expected.
+
+    assertTrue(sparkSession2.isOpen());
+    sessionManager.shutdown();
+    sessionManager.closeSession(sparkSession1);
+  }
+
+  /**
+   * Tests multi-user scenario (like HiveServer2) where each user gets a 
session
+   * and uses it multiple times.
+   */
+  @Test
+  public void testMultiSessionMultipleUse() throws Exception {
+    sessionManagerHS2 = SparkSessionManagerImpl.getInstance();
+
+    // Shutdown existing session manager
+    sessionManagerHS2.shutdown();
+
+    HiveConf hiveConf = new HiveConf();
+    sessionManagerHS2.setup(hiveConf);
+
+    List<Thread> threadList = new ArrayList<Thread>();
+    for (int i = 0; i < 10; i++) {
+      Thread t = new Thread(new SessionThread(), "Session thread " + i);
+      t.start();
+      threadList.add(t);
+    }
+
+    for (Thread t : threadList) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        String msg = "Interrupted while waiting for test session threads.";
+        LOG.error(msg, e);
+        fail(msg);
+      }
+    }
+
+    assertFalse("At least one of the session threads failed. See the test 
output for details.",
+        anyFailedSessionThread);
+
+    System.out.println("Ending SessionManagerHS2");
+    sessionManagerHS2.shutdown();
+  }
+
+  /* Thread simulating a user session in HiveServer2. */
+  public class SessionThread implements Runnable {
+
+
+    @Override
+    public void run() {
+      try {
+        Random random = new Random(Thread.currentThread().getId());
+        String threadName = Thread.currentThread().getName();
+        System.out.println(threadName + " started.");
+        HiveConf conf = new HiveConf();
+        SparkSession prevSession = null;
+        SparkSession currentSession = null;
+
+        for(int i = 0; i < 5; i++) {
+          currentSession = sessionManagerHS2.getSession(prevSession, conf, 
true);
+          assertTrue(prevSession == null || prevSession == currentSession);
+          assertTrue(currentSession.isOpen());
+          System.out.println(String.format("%s got session (%d): %s",
+              threadName, i, currentSession.getSessionId()));
+          Thread.sleep((random.nextInt(3)+1) * 1000);
+
+          sessionManagerHS2.returnSession(currentSession);
+          prevSession = currentSession;
+        }
+        sessionManagerHS2.closeSession(currentSession);
+        System.out.println(threadName + " ended.");
+      } catch (Throwable e) {
+        anyFailedSessionThread = true;
+        String msg = String.format("Error executing '%s'", 
Thread.currentThread().getName());
+        LOG.error(msg, e);
+        fail(msg + " " + StringUtils.stringifyException(e));
+      }
+    }
+  }
+}

Modified: 
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1618702&r1=1618701&r2=1618702&view=diff
==============================================================================
--- 
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
 (original)
+++ 
hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
 Mon Aug 18 18:47:10 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.Log
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.service.CompositeService;
@@ -87,6 +88,14 @@ public class HiveServer2 extends Composi
         e.printStackTrace();
       }
     }
+
+    if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      try {
+        SparkSessionManagerImpl.getInstance().shutdown();
+      } catch(Exception ex) {
+        LOG.error("Spark session pool manager failed to stop during 
HiveServer2 shutdown.", ex);
+      }
+    }
   }
 
   private static void startHiveServer2() throws Throwable {
@@ -104,6 +113,10 @@ public class HiveServer2 extends Composi
           sessionPool.setupPool(hiveConf);
           sessionPool.startPool();
         }
+
+        if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+          SparkSessionManagerImpl.getInstance().setup(hiveConf);
+        }
         break;
       } catch (Throwable throwable) {
         if(++attempts >= maxAttempts) {


Reply via email to