Repository: lens Updated Branches: refs/heads/master 4556773e3 -> fe4ea6bd6
LENS-1323 : Fix TestRemoteHiveDriver#testMultiThreadClient test failure Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/fe4ea6bd Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/fe4ea6bd Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/fe4ea6bd Branch: refs/heads/master Commit: fe4ea6bd60596f7fb62239d9011050b338acced0 Parents: 4556773 Author: Rajat Khandelwal <pro...@apache.org> Authored: Sat Sep 17 11:57:39 2016 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Sat Sep 17 11:57:39 2016 +0530 ---------------------------------------------------------------------- .../org/apache/lens/driver/hive/HiveDriver.java | 87 +++++++++----------- .../lens/driver/hive/TestRemoteHiveDriver.java | 6 +- 2 files changed, 44 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/fe4ea6bd/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java index 0218be3..84d9933 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java @@ -583,7 +583,7 @@ public class HiveDriver extends AbstractLensDriver { } private DriverQueryStatus updateDriverStateFromOperationStatus(OperationHandle handle, DriverQueryStatus status) - throws LensException, HiveSQLException { + throws LensException, HiveSQLException, IOException { if (status == null) { status = new DriverQueryStatus(); } @@ -626,34 +626,13 @@ public class HiveDriver extends AbstractLensDriver { default: throw new LensException("Query is in unknown state at HiveServer"); } - return status; - } - /* - * (non-Javadoc) - * - * @see org.apache.lens.server.api.driver.LensDriver#updateStatus(org.apache.lens.server.api.query.QueryContext) - */ - @Override - public void updateStatus(QueryContext context) throws LensException { - log.debug("GetStatus: {}", context.getQueryHandle()); - if (context.getDriverStatus().isFinished()) { - return; - } - OperationHandle hiveHandle = getHiveHandle(context.getQueryHandle()); - ByteArrayInputStream in = null; - try { - // Get operation status from hive server - log.debug("GetStatus hiveHandle: {}", hiveHandle); - fetchLogs(hiveHandle); - OperationStatus opStatus = getClient().getOperationStatus(hiveHandle); - updateDriverStateFromOperationStatus(hiveHandle, context.getDriverStatus()); - float progress = 0f; - String jsonTaskStatus = opStatus.getTaskStatus(); - String errorMsg = null; - if (StringUtils.isNotBlank(jsonTaskStatus)) { + float progress = 0f; + String jsonTaskStatus = opStatus.getTaskStatus(); + String errorMsg = null; + if (StringUtils.isNotBlank(jsonTaskStatus)) { + try (ByteArrayInputStream in = new ByteArrayInputStream(jsonTaskStatus.getBytes("UTF-8"))) { ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - in = new ByteArrayInputStream(jsonTaskStatus.getBytes("UTF-8")); List<TaskDisplay> taskStatuses = mapper.readValue(in, new TypeReference<List<TaskDisplay>>() { }); int completedTasks = 0; @@ -670,32 +649,44 @@ public class HiveDriver extends AbstractLensDriver { } progress = taskStatuses.size() == 0 ? 0 : (float) completedTasks / taskStatuses.size(); errorMsg = errorMessage.toString(); - } else { - log.warn("Empty task statuses"); - } - String error = null; - if (StringUtils.isNotBlank(errorMsg)) { - error = errorMsg; - } else if (opStatus.getState().equals(OperationState.ERROR)) { - error = context.getDriverStatus().getErrorMessage(); } - context.getDriverStatus().setErrorMessage(error); - context.getDriverStatus().setProgressMessage(jsonTaskStatus); - context.getDriverStatus().setProgress(progress); - context.getDriverStatus().setDriverStartTime(opStatus.getOperationStarted()); - context.getDriverStatus().setDriverFinishTime(opStatus.getOperationCompleted()); + } else { + log.warn("Empty task statuses"); + } + String error = null; + if (StringUtils.isNotBlank(errorMsg)) { + error = errorMsg; + } else if (status.getState().equals(DriverQueryState.FAILED)) { + error = status.getErrorMessage(); + } + status.setErrorMessage(error); + status.setProgressMessage(jsonTaskStatus); + status.setProgress(progress); + status.setDriverStartTime(opStatus.getOperationStarted()); + status.setDriverFinishTime(opStatus.getOperationCompleted()); + return status; + } + /* + * (non-Javadoc) + * + * @see org.apache.lens.server.api.driver.LensDriver#updateStatus(org.apache.lens.server.api.query.QueryContext) + */ + @Override + public void updateStatus(QueryContext context) throws LensException { + log.debug("GetStatus: {}", context.getQueryHandle()); + if (context.getDriverStatus().isFinished()) { + return; + } + OperationHandle hiveHandle = getHiveHandle(context.getQueryHandle()); + try { + // Get operation status from hive server + log.debug("GetStatus hiveHandle: {}", hiveHandle); + fetchLogs(hiveHandle); + updateDriverStateFromOperationStatus(hiveHandle, context.getDriverStatus()); } catch (Exception e) { log.error("Error getting query status", e); handleHiveServerError(context, e); throw new LensException("Error getting query status", e); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - log.error("Error closing stream.", e); - } - } } } http://git-wip-us.apache.org/repos/asf/lens/blob/fe4ea6bd/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java index 8a776a8..1acbb13 100644 --- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java +++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java @@ -179,12 +179,14 @@ public class TestRemoteHiveDriver extends TestHiveDriver { final int THREADS = 5; final long POLL_DELAY = 500; List<Thread> thrs = new ArrayList<Thread>(); + List<QueryContext> queries = new ArrayList<>(); final AtomicInteger errCount = new AtomicInteger(); for (int q = 0; q < QUERIES; q++) { final QueryContext qctx; try { qctx = createContext("SELECT * FROM test_multithreads", queryConf, thrDriver); thrDriver.executeAsync(qctx); + queries.add(qctx); } catch (LensException e) { errCount.incrementAndGet(); log.info(q + " executeAsync error: " + e.getCause()); @@ -205,7 +207,6 @@ public class TestRemoteHiveDriver extends TestHiveDriver { thrDriver.updateStatus(qctx); if (qctx.getDriverStatus().isFinished()) { log.info("@@ " + handle.getHandleId() + " >> " + qctx.getDriverStatus().getState()); - thrDriver.closeQuery(handle); break; } Thread.sleep(POLL_DELAY); @@ -233,6 +234,9 @@ public class TestRemoteHiveDriver extends TestHiveDriver { log.warn("Not ended yet: " + th.getName()); } } + for (QueryContext queryContext: queries) { + thrDriver.closeQuery(queryContext.getQueryHandle()); + } Assert.assertEquals(0, thrDriver.getHiveHandleSize()); log.info("@@ Completed all pollers. Total thrift errors: " + errCount.get()); assertEquals(launchedQueries, QUERIES);