Repository: tajo Updated Branches: refs/heads/master b49ff30b9 -> fe8708519
TAJO-980: execution page in Web UI broken. Closes #97 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fe870851 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fe870851 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fe870851 Branch: refs/heads/master Commit: fe870851934e76cde745f84b0b8061480031a7d1 Parents: b49ff30 Author: Hyunsik Choi <[email protected]> Authored: Tue Jul 29 11:39:23 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Tue Jul 29 11:39:23 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/webapp/QueryExecutorServlet.java | 200 ++++++++++++------- 2 files changed, 134 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/fe870851/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 95747a1..4390177 100644 --- a/CHANGES +++ b/CHANGES @@ -97,6 +97,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-980: execution page in Web UI broken. (hyunsik) + TAJO-952: Wrong default partition volume config. (Mai Hai Thanh via jihoon) TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim http://git-wip-us.apache.org/repos/asf/tajo/blob/fe870851/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index 3cb7d25..8b849ca 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -1,9 +1,11 @@ package org.apache.tajo.webapp; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; @@ -26,6 +28,7 @@ import java.io.IOException; import java.io.OutputStream; import java.sql.ResultSet; import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; @@ -159,7 +162,7 @@ public class QueryExecutorServlet extends HttpServlet { return; } returnValue.put("numOfRows", queryRunner.numOfRows); - returnValue.put("resultSize", queryRunner.resultSize); + returnValue.put("resultSize", queryRunner.resultRows); returnValue.put("resultData", queryRunner.queryResult); returnValue.put("resultColumns", queryRunner.columnNames); returnValue.put("runningTime", JSPUtil.getElapsedTime(queryRunner.startTime, queryRunner.finishTime)); @@ -239,12 +242,12 @@ public class QueryExecutorServlet extends HttpServlet { String queryRunnerId; - ClientProtos.SubmitQueryResponse queryRespons; + ClientProtos.SubmitQueryResponse response; AtomicBoolean running = new AtomicBoolean(true); AtomicBoolean stop = new AtomicBoolean(false); QueryId queryId; String query; - long resultSize; + long resultRows; int sizeLimit; long numOfRows; Exception error; @@ -268,60 +271,110 @@ public class QueryExecutorServlet extends HttpServlet { public void run() { startTime = System.currentTimeMillis(); try { - queryRespons = tajoClient.executeQuery(query); - if (queryRespons.getResultCode() == ClientProtos.ResultCode.OK) { - QueryId queryId = null; - try { - queryId = new QueryId(queryRespons.getQueryId()); + response = tajoClient.executeQuery(query); + + if (response == null) { + LOG.error("Internal Error: SubmissionResponse is NULL"); + error = new Exception("Internal Error: SubmissionResponse is NULL"); + + } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { + if (response.getIsForwarded()) { + queryId = new QueryId(response.getQueryId()); getQueryResult(queryId); - } finally { - if (queryId != null) { - tajoClient.closeQuery(queryId); + } else { + if (!response.hasTableDesc() && !response.hasResultSet()) { + } else { + getSimpleQueryResult(response); } + + progress.set(100); } - } else { - LOG.error("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode()); - error = new Exception("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode()); } } catch (Exception e) { LOG.error(e.getMessage(), e); error = e; } finally { running.set(false); + finishTime = System.currentTimeMillis(); + + if (queryId != null) { + tajoClient.closeQuery(queryId); + } } } - private void getQueryResult(QueryId tajoQueryId) { - // query execute + private void getSimpleQueryResult(ClientProtos.SubmitQueryResponse response) { + ResultSet res = null; try { - QueryStatus status = null; + QueryId queryId = new QueryId(response.getQueryId()); + TableDesc desc = new TableDesc(response.getTableDesc()); - while (!stop.get()) { + if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + // non-forwarded INSERT INTO query does not have any query id. + // In this case, it just returns succeeded query information without printing the query results. + } else { + res = TajoClient.createResultSet(tajoClient, response); + MakeResultText(res, desc); + } + progress.set(100); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + error = e; + } finally { + if (res != null) { try { - Thread.sleep(1000); - } catch(InterruptedException e) { - break; - } - status = tajoClient.getQueryStatus(tajoQueryId); - if (status.getState() == TajoProtos.QueryState.QUERY_MASTER_INIT - || status.getState() == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED) { - continue; + res.close(); + } catch (SQLException e) { } + } + } + } - if (status.getState() == TajoProtos.QueryState.QUERY_RUNNING - || status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { - int progressValue = (int) (status.getProgress() * 100.0f); - if(progressValue == 100) { - progressValue = 99; - } - progress.set(progressValue); - } - if (status.getState() != TajoProtos.QueryState.QUERY_RUNNING - && status.getState() != TajoProtos.QueryState.QUERY_NOT_ASSIGNED) { - break; + private QueryStatus waitForComplete(QueryId queryid) throws ServiceException { + QueryStatus status = null; + + while (!stop.get()) { + + try { + Thread.sleep(150); + } catch(InterruptedException e) { + break; + } + + status = tajoClient.getQueryStatus(queryid); + if (status.getState() == TajoProtos.QueryState.QUERY_MASTER_INIT + || status.getState() == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED) { + continue; + } + + if (status.getState() == TajoProtos.QueryState.QUERY_RUNNING + || status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { + int progressValue = (int) (status.getProgress() * 100.0f); + if(progressValue == 100) { + progressValue = 99; } + progress.set(progressValue); } + if (status.getState() != TajoProtos.QueryState.QUERY_RUNNING + && status.getState() != TajoProtos.QueryState.QUERY_NOT_ASSIGNED) { + break; + } + + try { + Thread.sleep(100); + } catch(InterruptedException e) { + break; + } + } + + return status; + } + + private void getQueryResult(QueryId tajoQueryId) { + // query execute + try { + QueryStatus status = waitForComplete(tajoQueryId); if(status == null) { LOG.error("Query Status is null"); @@ -344,44 +397,21 @@ public class QueryExecutorServlet extends HttpServlet { tajoClient.getConf().setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName()); res = new TajoResultSet(tajoClient, queryId, tajoClient.getConf(), desc); - ResultSetMetaData rsmd = res.getMetaData(); - resultSize = desc.getStats().getNumBytes(); - LOG.info("Tajo Query Result: " + desc.getPath() + "\n"); - - int numOfColumns = rsmd.getColumnCount(); - for(int i = 0; i < numOfColumns; i++) { - columnNames.add(rsmd.getColumnName(i + 1)); - } - queryResult = new ArrayList<List<Object>>(); - - if(sizeLimit < resultSize) { - numOfRows = (long)((float)(desc.getStats().getNumRows()) * ((float)sizeLimit / (float)resultSize)); - } else { - numOfRows = desc.getStats().getNumRows(); - } - int rowCount = 0; - boolean hasMoreData = false; - while (res.next()) { - if(rowCount > numOfRows) { - hasMoreData = true; - break; - } - List<Object> row = new ArrayList<Object>(); - for(int i = 0; i < numOfColumns; i++) { - row.add(res.getObject(i + 1).toString()); - } - queryResult.add(row); - rowCount++; + MakeResultText(res, desc); - } } finally { if (res != null) { res.close(); } progress.set(100); } - } else { - error = new Exception(queryId + " no result"); + } else { // CTAS or INSERT (OVERWRITE) INTO + progress.set(100); + try { + tajoClient.closeQuery(queryId); + } catch (Exception e) { + LOG.warn(e); + } } } } @@ -390,5 +420,39 @@ public class QueryExecutorServlet extends HttpServlet { error = e; } } + + private void MakeResultText(ResultSet res, TableDesc desc) throws SQLException { + ResultSetMetaData rsmd = res.getMetaData(); + resultRows = desc.getStats() == null ? 0 : desc.getStats().getNumRows(); + if (resultRows == 0) { + resultRows = 1000; + } + LOG.info("Tajo Query Result: " + desc.getPath() + "\n"); + + int numOfColumns = rsmd.getColumnCount(); + for(int i = 0; i < numOfColumns; i++) { + columnNames.add(rsmd.getColumnName(i + 1)); + } + queryResult = new ArrayList<List<Object>>(); + + if(sizeLimit < resultRows) { + numOfRows = (long)((float)(resultRows) * ((float)sizeLimit / (float) resultRows)); + } else { + numOfRows = resultRows; + } + + int rowCount = 0; + while (res.next()) { + if(rowCount > numOfRows) { + break; + } + List<Object> row = new ArrayList<Object>(); + for(int i = 0; i < numOfColumns; i++) { + row.add(res.getObject(i + 1).toString()); + } + queryResult.add(row); + rowCount++; + } + } } }
