LENS-760 : Session close should not result in running query failures.
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/ff891e2c Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/ff891e2c Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/ff891e2c Branch: refs/heads/LENS-581 Commit: ff891e2cf2a77fd28a7476ad6a6af814bb013661 Parents: 7c7c86d Author: Deepak Barr <[email protected]> Authored: Sat Dec 12 00:17:47 2015 +0530 Committer: Deepak Kumar Barr <[email protected]> Committed: Sat Dec 12 00:17:47 2015 +0530 ---------------------------------------------------------------------- .../org/apache/lens/driver/hive/HiveDriver.java | 95 +++++++++++++++----- .../lens/driver/hive/TestRemoteHiveDriver.java | 4 +- .../lens/server/query/TestQueryService.java | 20 +++++ 3 files changed, 98 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java index a84c679..253cfc4 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java @@ -112,6 +112,12 @@ public class HiveDriver extends AbstractLensDriver { /** The hive handles. */ private Map<QueryHandle, OperationHandle> hiveHandles = new ConcurrentHashMap<QueryHandle, OperationHandle>(); + /** The orphaned hive sessions. */ + private ConcurrentLinkedQueue<SessionHandle> orphanedHiveSessions; + + /** The opHandle to hive session map. */ + private Map<OperationHandle, SessionHandle> opHandleToSession; + /** The session lock. */ private final Lock sessionLock; @@ -314,6 +320,8 @@ public class HiveDriver extends AbstractLensDriver { public HiveDriver() throws LensException { this.sessionLock = new ReentrantLock(); lensToHiveSession = new HashMap<String, SessionHandle>(); + opHandleToSession = new ConcurrentHashMap<OperationHandle, SessionHandle>(); + orphanedHiveSessions = new ConcurrentLinkedQueue<SessionHandle>(); resourcesAddedForSession = new HashMap<SessionHandle, Boolean>(); connectionExpiryThread.setDaemon(true); connectionExpiryThread.setName("HiveDriver-ConnectionExpiryThread"); @@ -491,15 +499,18 @@ public class HiveDriver extends AbstractLensDriver { */ // assuming this is only called for executing explain/insert/set/delete/etc... queries which don't ask to fetch data. public LensResultSet execute(QueryContext ctx) throws LensException { + OperationHandle op = null; try { addPersistentPath(ctx); Configuration qdconf = ctx.getDriverConf(this); qdconf.set("mapred.job.name", ctx.getQueryHandle().toString()); - OperationHandle op = getClient().executeStatement(getSession(ctx), ctx.getSelectedDriverQuery(), + SessionHandle sessionHandle = getSession(ctx); + op = getClient().executeStatement(sessionHandle, ctx.getSelectedDriverQuery(), qdconf.getValByRegex(".*")); log.info("The hive operation handle: {}", op); ctx.setDriverOpHandle(op.toString()); hiveHandles.put(ctx.getQueryHandle(), op); + opHandleToSession.put(op, sessionHandle); updateStatus(ctx); OperationStatus status = getClient().getOperationStatus(op); @@ -519,6 +530,10 @@ public class HiveDriver extends AbstractLensDriver { } catch (HiveSQLException hiveErr) { handleHiveServerError(ctx, hiveErr); throw new LensException("Error executing query", hiveErr); + } finally { + if (null != op) { + opHandleToSession.remove(op); + } } } @@ -550,11 +565,13 @@ public class HiveDriver extends AbstractLensDriver { } } queryHook.preLaunch(ctx); - OperationHandle op = getClient().executeStatementAsync(getSession(ctx), ctx.getSelectedDriverQuery(), + SessionHandle sessionHandle = getSession(ctx); + OperationHandle op = getClient().executeStatementAsync(sessionHandle, ctx.getSelectedDriverQuery(), qdconf.getValByRegex(".*")); ctx.setDriverOpHandle(op.toString()); log.info("QueryHandle: {} HiveHandle:{}", ctx.getQueryHandle(), op); hiveHandles.put(ctx.getQueryHandle(), op); + opHandleToSession.put(op, sessionHandle); } catch (IOException e) { throw new LensException("Error adding persistent path", e); } catch (HiveSQLException e) { @@ -726,6 +743,18 @@ public class HiveDriver extends AbstractLensDriver { } catch (HiveSQLException e) { checkInvalidOperation(handle, e); throw new LensException("Unable to close query", e); + } finally { + SessionHandle hiveSession = opHandleToSession.remove(opHandle); + if (null != hiveSession && !opHandleToSession.containsValue(hiveSession) + && orphanedHiveSessions.contains(hiveSession)) { + orphanedHiveSessions.remove(hiveSession); + try { + getClient().closeSession(hiveSession); + log.info("Closed orphaned hive session : {}", hiveSession.getHandleIdentifier()); + } catch (HiveSQLException e) { + log.warn("Error closing orphan hive session : {} ", hiveSession.getHandleIdentifier(), e); + } + } } } } @@ -739,6 +768,7 @@ public class HiveDriver extends AbstractLensDriver { public boolean cancelQuery(QueryHandle handle) throws LensException { log.info("CancelQuery: {}", handle); OperationHandle hiveHandle = getHiveHandle(handle); + opHandleToSession.remove(hiveHandle); try { log.info("CancelQuery hiveHandle: {}", hiveHandle); getClient().cancelOperation(hiveHandle); @@ -757,22 +787,11 @@ public class HiveDriver extends AbstractLensDriver { @Override public void close() { log.info("CloseDriver {}", getFullyQualifiedName()); - // Close this driver and release all resources + // Close this driver sessionLock.lock(); - try { - for (String lensSessionDbKey : lensToHiveSession.keySet()) { - try { - getClient().closeSession(lensToHiveSession.get(lensSessionDbKey)); - } catch (Exception e) { - checkInvalidSession(e); - log.warn("Error closing session for lens session: {}, hive session: ", lensSessionDbKey, - lensToHiveSession.get(lensSessionDbKey), e); - } - } - lensToHiveSession.clear(); - } finally { - sessionLock.unlock(); - } + lensToHiveSession.clear(); + orphanedHiveSessions.clear(); + sessionLock.unlock(); } /** @@ -1087,6 +1106,21 @@ public class HiveDriver extends AbstractLensDriver { } log.info("Hive driver {} recovered {} sessions", getFullyQualifiedName(), lensToHiveSession.size()); } + int numOpHandles = in.readInt(); + for (int i = 0; i < numOpHandles; i++) { + OperationHandle opHandle = new OperationHandle((TOperationHandle) in.readObject()); + SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(), + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6); + opHandleToSession.put(opHandle, sHandle); + } + log.info("Hive driver {} recovered {} operation handles", getFullyQualifiedName(), opHandleToSession.size()); + int numOrphanedSessions = in.readInt(); + for (int i = 0; i < numOrphanedSessions; i++) { + SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(), + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6); + orphanedHiveSessions.add(sHandle); + } + log.info("Hive driver {} recovered {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size()); } /* @@ -1111,6 +1145,17 @@ public class HiveDriver extends AbstractLensDriver { out.writeObject(entry.getValue().toTSessionHandle()); } log.info("Hive driver {} persisted {} sessions", getFullyQualifiedName(), lensToHiveSession.size()); + out.writeInt(opHandleToSession.size()); + for (Map.Entry<OperationHandle, SessionHandle> entry : opHandleToSession.entrySet()) { + out.writeObject(entry.getKey().toTOperationHandle()); + out.writeObject(entry.getValue().toTSessionHandle()); + } + log.info("Hive driver {} persisted {} operation handles", getFullyQualifiedName(), opHandleToSession.size()); + out.writeInt(orphanedHiveSessions.size()); + for (SessionHandle sessionHandle : orphanedHiveSessions) { + out.writeObject(sessionHandle.toTSessionHandle()); + } + log.info("Hive driver {} persisted {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size()); } } @@ -1243,9 +1288,15 @@ public class HiveDriver extends AbstractLensDriver { SessionHandle hiveSession = lensToHiveSession.remove(sessionDbKey); if (hiveSession != null) { try { - getClient().closeSession(hiveSession); - log.info("Closed Hive session {} for lens session {}", hiveSession.getHandleIdentifier(), - sessionDbKey); + if (isSessionClosable(hiveSession)) { + getClient().closeSession(hiveSession); + log.info("Closed Hive session {} for lens session {}", hiveSession.getHandleIdentifier(), + sessionDbKey); + } else { + log.info("Skipped closing hive session {} for lens session {} due to active operations", + hiveSession.getHandleIdentifier(), sessionDbKey); + orphanedHiveSessions.add(hiveSession); + } } catch (Exception e) { log.error("Error closing hive session {} for lens session {}", hiveSession.getHandleIdentifier(), sessionDbKey, e); @@ -1259,6 +1310,10 @@ public class HiveDriver extends AbstractLensDriver { } } + private boolean isSessionClosable(SessionHandle hiveSession) { + return !opHandleToSession.containsValue(hiveSession); + } + /** * Close all connections. */ http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java index ab5ada9..4f18c24 100644 --- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java +++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java @@ -274,9 +274,11 @@ public class TestRemoteHiveDriver extends TestHiveDriver { // Write driver to stream ByteArrayOutputStream driverBytes = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(driverBytes); try { - oldDriver.writeExternal(new ObjectOutputStream(driverBytes)); + oldDriver.writeExternal(out); } finally { + out.close(); driverBytes.close(); } http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java index f6693aa..efef358 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java @@ -1472,6 +1472,26 @@ public class TestQueryService extends LensJerseyTest { getLensQueryResult(target(), lensSessionId, ctx1.getQueryHandle()); } + /** + * Test session close when a query is active on the session + * + * @throws Exception + */ + @Test + public void testSessionClose() throws Exception { + // Query with group by, will run long enough to close the session before finish + String query = "select ID, IDSTR, count(*) from " + TEST_TABLE + " group by ID, IDSTR"; + SessionService sessionService = LensServices.get().getService(HiveSessionService.NAME); + Map<String, String> sessionconf = new HashMap<String, String>(); + LensSessionHandle sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf); + LensConf conf = getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "true"); + QueryHandle qHandle = + executeAndGetHandle(target(), Optional.of(sessionHandle), Optional.of(query), Optional.of(conf)); + sessionService.closeSession(sessionHandle); + sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf); + waitForQueryToFinish(target(), sessionHandle, qHandle, Status.SUCCESSFUL); + } + @AfterMethod private void waitForPurge() throws InterruptedException { waitForPurge(0, queryService.finishedQueries);
