Repository: tajo Updated Branches: refs/heads/master 33f4b7a06 -> c46dc1a64
TAJO-1712: querytasks.jsp throws NPE occasionally when tasks are running. (jinho) Closes #657 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c46dc1a6 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c46dc1a6 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c46dc1a6 Branch: refs/heads/master Commit: c46dc1a64f9528adc5f2a54db3bc6aa247252b84 Parents: 33f4b7a Author: Jinho Kim <[email protected]> Authored: Wed Jul 29 16:53:11 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Wed Jul 29 16:53:11 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/master/QueryManager.java | 44 ++++++-- .../main/java/org/apache/tajo/util/JSPUtil.java | 19 ++++ .../apache/tajo/util/history/HistoryReader.java | 100 ++++++++++++------- .../src/main/resources/webapps/admin/query.jsp | 13 +-- .../resources/webapps/worker/querytasks.jsp | 18 ++-- .../util/history/TestHistoryWriterReader.java | 12 ++- 7 files changed, 152 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 721f8ff..1389b6f 100644 --- a/CHANGES +++ b/CHANGES @@ -200,6 +200,9 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1712: querytasks.jsp throws NPE occasionally when tasks are running. + (jinho) + TAJO-1716: Repartitioner.makeEvenDistributedFetchImpl() does not distribute fetches evenly. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 2578c9f..8be298e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -18,6 +18,7 @@ package org.apache.tajo.master; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.map.LRUMap; @@ -44,10 +45,7 @@ import org.apache.tajo.session.Session; import org.apache.tajo.util.history.HistoryReader; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicLong; /** @@ -119,17 +117,51 @@ public class QueryManager extends CompositeService { return Collections.unmodifiableCollection(runningQueries.values()); } + @Deprecated public Collection<QueryInfo> getFinishedQueries() { Set<QueryInfo> result = Sets.newTreeSet(); + synchronized (historyCache) { result.addAll(historyCache.values()); } try { synchronized (this) { - result.addAll(this.masterContext.getHistoryReader().getQueries(null)); + result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory()); + } + return result; + } catch (Throwable e) { + LOG.error(e, e); + return result; + } + } + + /** + * Get query history in cache or persistent storage + */ + public Collection<QueryInfo> getFinishedQueries(int page, int size) { + TreeSet<QueryInfo> result = Sets.newTreeSet(); + if(page <= 0 || size <= 0) { + return result; + } + + List<QueryInfo> cacheList = Lists.newArrayList(); + synchronized (historyCache) { + // request size fits in cache + if (page == 1 && size <= historyCache.size()) { + cacheList.addAll(historyCache.values()); } + } + + if (cacheList.size() > 0) { + result.addAll(cacheList.subList(0, size)); return result; + } + + try { + synchronized (this) { + return this.masterContext.getHistoryReader().getQueriesInHistory(page, size); + } } catch (Throwable e) { LOG.error(e, e); return result; @@ -144,7 +176,7 @@ public class QueryManager extends CompositeService { } if (queryInfo == null) { synchronized (this) { - queryInfo = this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); + queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId); } } return queryInfo; http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 7641320..1d93c3c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -355,6 +355,25 @@ public class JSPUtil { return sb.toString(); } + public static String getPageNavigation(int currentPage, boolean next, String url) { + StringBuilder sb = new StringBuilder(); + if (currentPage > 1) { + sb.append("<a href='").append(url) + .append("&page=").append(currentPage - 1).append("'>") + .append("<prev</a>"); + sb.append(" "); + } + + sb.append(currentPage); + + if(next) { + sb.append(" ").append("<a href='").append(url) + .append("&page=").append(currentPage + 1).append("'>") + .append("next></a>"); + } + return sb.toString(); + } + public static <T extends Object> List<T> getPageNavigationList(List<T> originList, int page, int pageSize) { if (originList == null) { return new ArrayList<T>(); http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java index 2acd5f6..27d823e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -18,6 +18,7 @@ package org.apache.tajo.util.history; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; @@ -25,9 +26,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.QueryId; +import org.apache.tajo.ResourceProtos.TaskHistoryProto; import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.annotation.Nullable; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ResourceProtos.TaskHistoryProto; import org.apache.tajo.master.QueryInfo; import org.apache.tajo.util.Bytes; @@ -54,7 +57,28 @@ public class HistoryReader { taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf); } - public List<QueryInfo> getQueries(String keyword) throws IOException { + @Deprecated + public List<QueryInfo> getQueriesInHistory() throws IOException { + return getQueriesInHistory(-1, Integer.MAX_VALUE); + } + + public List<QueryInfo> getQueriesInHistory(int page, int size) throws IOException { + List<QueryInfo> queryList = getQueryInfoInHistory(page, size, null); + if (queryList.size() > size) { + queryList = queryList.subList(0, size); + } + + Collections.sort(queryList, new Comparator<QueryInfo>() { + @Override + public int compare(QueryInfo query1, QueryInfo query2) { + return query2.compareTo(query1); + } + }); + + return queryList; + } + + private List<QueryInfo> getQueryInfoInHistory(int page, int size, @Nullable QueryId queryId) throws IOException { List<QueryInfo> queryInfos = new ArrayList<QueryInfo>(); FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf); @@ -62,7 +86,7 @@ public class HistoryReader { if (!fs.exists(historyParentPath)) { return queryInfos; } - } catch (Throwable e){ + } catch (Throwable e) { return queryInfos; } @@ -71,7 +95,11 @@ public class HistoryReader { return queryInfos; } - for (FileStatus eachDateFile: files) { + int startIndex = page < 1 ? page : (page - 1) * size; // set index to last index of previous page + int currentIndex = 0; + + ArrayUtils.reverse(files); + for (FileStatus eachDateFile : files) { Path queryListPath = new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST); if (eachDateFile.isFile() || !fs.exists(queryListPath)) { continue; @@ -82,52 +110,70 @@ public class HistoryReader { continue; } - for (FileStatus eachFile: dateFiles) { + ArrayUtils.reverse(dateFiles); + for (FileStatus eachFile : dateFiles) { Path path = eachFile.getPath(); if (eachFile.isDirectory() || !path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX)) { continue; } FSDataInputStream in = null; + long totalLength = 0; + try { in = fs.open(path); - byte[] buf = new byte[100 * 1024]; - while (true) { + while (totalLength < eachFile.getLen()) { int length = in.readInt(); - if (length > buf.length) { - buf = new byte[length]; + totalLength += 4; + + currentIndex++; + //skip previous page + if (startIndex >= currentIndex) { + totalLength += in.skipBytes(length); + continue; } + + byte[] buf = new byte[length]; in.readFully(buf, 0, length); + totalLength += length; + String queryInfoJson = new String(buf, 0, length, Bytes.UTF8_CHARSET); QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson); - if (keyword != null) { - if (queryInfo.getSql().indexOf(keyword) >= 0) { + + if (queryId != null) { + if (queryInfo.getQueryId().equals(queryId)) { queryInfos.add(queryInfo); + return queryInfos; } } else { queryInfos.add(queryInfo); } } - } catch (EOFException e) { } catch (Throwable e) { - LOG.warn("Reading error:" + path + ", " +e.getMessage()); + LOG.warn("Reading error:" + path + ", " + e.getMessage()); } finally { IOUtils.cleanup(LOG, in); } - } - } - Collections.sort(queryInfos, new Comparator<QueryInfo>() { - @Override - public int compare(QueryInfo query1, QueryInfo query2) { - return query2.getQueryIdStr().toString().compareTo(query1.getQueryIdStr().toString()); + if (queryInfos.size() >= size) { + return queryInfos; + } } - }); + } return queryInfos; } + public QueryInfo getQueryByQueryId(QueryId queryId) throws IOException { + List<QueryInfo> queryInfoList = getQueryInfoInHistory(-1, Integer.MAX_VALUE, queryId); + if (queryInfoList.size() > 0) { + return queryInfoList.get(0); + } else { + return null; + } + } + private Path getQueryHistoryFilePath(String queryId, long startTime) throws IOException { if (startTime == 0) { String[] tokens = queryId.split("_"); @@ -298,18 +344,4 @@ public class HistoryReader { } return null; } - - public QueryInfo getQueryInfo(String queryId) throws IOException { - List<QueryInfo> queries = getQueries(null); - - if (queries != null) { - for (QueryInfo queryInfo: queries) { - if (queryId.equals(queryInfo.getQueryId().toString())) { - return queryInfo; - } - } - } - - return null; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 0701e34..7422a03 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -59,14 +59,9 @@ } } - List<QueryInfo> allFinishedQueries = new ArrayList<QueryInfo>(master.getContext().getQueryJobManager().getFinishedQueries()); - Collections.sort(allFinishedQueries, java.util.Collections.reverseOrder()); - - int numOfFinishedQueries = allFinishedQueries.size(); - int totalPage = numOfFinishedQueries % pageSize == 0 ? - numOfFinishedQueries / pageSize : numOfFinishedQueries / pageSize + 1; - - List<QueryInfo> finishedQueries = JSPUtil.getPageNavigationList(allFinishedQueries, currentPage, pageSize); + List<QueryInfo> finishedQueries = new ArrayList<QueryInfo>( + master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize)); + Collections.sort(finishedQueries, java.util.Collections.reverseOrder()); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -220,7 +215,7 @@ %> </table> <div align="center"> - <%=JSPUtil.getPageNavigation(currentPage, totalPage, "query.jsp?pageSize=" + pageSize)%> + <%=JSPUtil.getPageNavigation(currentPage, finishedQueries.size() == pageSize, "query.jsp?pageSize=" + pageSize)%> </div> <p/> <% http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/main/resources/webapps/worker/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp index 2c32006..caaec35 100644 --- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp @@ -111,16 +111,18 @@ float totalProgress = 0.0f; for(Task eachTask : allTasks) { - totalProgress += eachTask.getLastAttempt() != null ? eachTask.getLastAttempt().getProgress(): 0.0f; + numShuffles = eachTask.getShuffleOutpuNum(); - if (eachTask.getLastAttempt() != null) { - TableStats inputStats = eachTask.getLastAttempt().getInputStats(); + TaskAttempt lastAttempt = eachTask.getLastAttempt(); + if (lastAttempt != null) { + totalProgress += lastAttempt.getProgress(); + TableStats inputStats = lastAttempt.getInputStats(); if (inputStats != null) { totalInputBytes += inputStats.getNumBytes(); totalReadBytes += inputStats.getReadBytes(); totalReadRows += inputStats.getNumRows(); } - TableStats outputStats = eachTask.getLastAttempt().getResultStats(); + TableStats outputStats = lastAttempt.getResultStats(); if (outputStats != null) { totalWriteBytes += outputStats.getNumBytes(); totalWriteRows += outputStats.getNumRows(); @@ -231,18 +233,20 @@ "&taskSeq=" + taskSeq + "&sort=" + sort + "&sortOrder=" + sortOrder; TaskAttempt lastAttempt = eachTask.getLastAttempt(); - String taskHost = lastAttempt == null ? "-" : lastAttempt.getWorkerConnectionInfo().getHost(); - if(lastAttempt != null) { + String taskHost = "-"; + float progress = 0.0f; + if(lastAttempt != null && lastAttempt.getWorkerConnectionInfo() != null) { WorkerConnectionInfo conn = lastAttempt.getWorkerConnectionInfo(); TaskAttemptId lastAttemptId = lastAttempt.getId(); taskHost = "<a href='http://" + conn.getHost() + ":" + conn.getHttpInfoPort() + "/taskdetail.jsp?taskAttemptId=" + lastAttemptId + "'>" + conn.getHost() + "</a>"; + progress = eachTask.getLastAttempt().getProgress(); } %> <tr> <td align='center'><%=rowNo%></td> <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td> <td align='center'><%=eachTask.getLastAttemptStatus()%></td> - <td align='center'><%=JSPUtil.percentFormat(eachTask.getLastAttempt().getProgress())%>%</td> + <td align='center'><%=JSPUtil.percentFormat(progress)%>%</td> <td align='center'><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td> <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime() + " ms"%></td> <td align='center'><%=eachTask.getRetryCount()%></td> http://git-wip-us.apache.org/repos/asf/tajo/blob/c46dc1a6/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java index f442bde..aee418c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java +++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java @@ -90,7 +90,7 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { assertTrue(histFiles[0].getPath().getName().endsWith(".hist")); HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf); - List<QueryInfo> queryInfos = reader.getQueries(null); + List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, 2); assertNotNull(queryInfos); assertEquals(2, queryInfos.size()); @@ -99,10 +99,20 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState()); assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0); + foundQueryInfo = reader.getQueryByQueryId(queryInfo2.getQueryId()); + assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId()); + assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState()); + assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0); + foundQueryInfo = queryInfos.get(1); assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId()); assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState()); assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0); + + foundQueryInfo = reader.getQueryByQueryId(queryInfo1.getQueryId()); + assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId()); + assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState()); + assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0); } finally { writer.stop(); }
