Repository: tajo Updated Branches: refs/heads/master 1be0e66b2 -> d8ce56263
TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho) Closes #672 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d8ce5626 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d8ce5626 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d8ce5626 Branch: refs/heads/master Commit: d8ce5626342e64bf02be92905e8ed2a147ef85ca Parents: 1be0e66 Author: Jinho Kim <[email protected]> Authored: Mon Aug 3 17:17:04 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Aug 3 17:17:04 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/master/QueryManager.java | 52 ++++++------ .../apache/tajo/util/history/HistoryReader.java | 87 ++++++++++---------- .../src/main/resources/webapps/admin/query.jsp | 6 +- .../util/history/TestHistoryWriterReader.java | 64 ++++++++++++++ 5 files changed, 137 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 19dd980..3deac6d 100644 --- a/CHANGES +++ b/CHANGES @@ -208,6 +208,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho) + TAJO-1731: With a task failure, query processing is hanged after first retry. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 8be298e..8838986 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -18,7 +18,6 @@ package org.apache.tajo.master; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.map.LRUMap; @@ -126,9 +125,7 @@ public class QueryManager extends CompositeService { } try { - synchronized (this) { - result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory()); - } + result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory()); return result; } catch (Throwable e) { LOG.error(e, e); @@ -137,34 +134,35 @@ public class QueryManager extends CompositeService { } /** - * Get query history in cache or persistent storage + * Get desc ordered query histories in cache or persistent storage + * @param page index of page + * @param size size of page */ - public Collection<QueryInfo> getFinishedQueries(int page, int size) { - TreeSet<QueryInfo> result = Sets.newTreeSet(); - if(page <= 0 || size <= 0) { - return result; + public List<QueryInfo> getFinishedQueries(int page, int size) { + if (page <= 0 || size <= 0) { + return Collections.EMPTY_LIST; } - List<QueryInfo> cacheList = Lists.newArrayList(); - synchronized (historyCache) { + if (page * size <= historyCache.size()) { + Set<QueryInfo> result = Sets.newTreeSet(Collections.reverseOrder()); // request size fits in cache - if (page == 1 && size <= historyCache.size()) { - cacheList.addAll(historyCache.values()); + synchronized (historyCache) { + result.addAll(historyCache.values()); } - } - - if (cacheList.size() > 0) { - result.addAll(cacheList.subList(0, size)); - return result; - } - - try { - synchronized (this) { + int fromIndex = (page - 1) * size; + return new LinkedList<QueryInfo>(result).subList(fromIndex, fromIndex + size); + } else { + try { return this.masterContext.getHistoryReader().getQueriesInHistory(page, size); + } catch (Throwable e) { + LOG.error(e, e); + Set<QueryInfo> result = Sets.newTreeSet(Collections.reverseOrder()); + // request size fits in cache + synchronized (historyCache) { + result.addAll(historyCache.values()); + } + return new LinkedList<QueryInfo>(result); } - } catch (Throwable e) { - LOG.error(e, e); - return result; } } @@ -175,9 +173,7 @@ public class QueryManager extends CompositeService { queryInfo = (QueryInfo) historyCache.get(queryId); } if (queryInfo == null) { - synchronized (this) { - queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId); - } + queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId); } return queryInfo; } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java index 27d823e..66077cf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -18,6 +18,8 @@ package org.apache.tajo.util.history; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,41 +64,37 @@ public class HistoryReader { return getQueriesInHistory(-1, Integer.MAX_VALUE); } + /** + * Get desc ordered query histories in persistent storage + * @param page index of page + * @param size size of page + */ public List<QueryInfo> getQueriesInHistory(int page, int size) throws IOException { - List<QueryInfo> queryList = getQueryInfoInHistory(page, size, null); - if (queryList.size() > size) { - queryList = queryList.subList(0, size); - } - - Collections.sort(queryList, new Comparator<QueryInfo>() { - @Override - public int compare(QueryInfo query1, QueryInfo query2) { - return query2.compareTo(query1); - } - }); - - return queryList; + return findQueryInfoInStorage(page, size, null); } - private List<QueryInfo> getQueryInfoInHistory(int page, int size, @Nullable QueryId queryId) throws IOException { - List<QueryInfo> queryInfos = new ArrayList<QueryInfo>(); + private synchronized List<QueryInfo> findQueryInfoInStorage(int page, int size, @Nullable QueryId queryId) + throws IOException { + List<QueryInfo> result = Lists.newLinkedList(); FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf); try { if (!fs.exists(historyParentPath)) { - return queryInfos; + return result; } } catch (Throwable e) { - return queryInfos; + return result; } FileStatus[] files = fs.listStatus(historyParentPath); if (files == null || files.length == 0) { - return queryInfos; + return result; } - int startIndex = page < 1 ? page : (page - 1) * size; // set index to last index of previous page + Set<QueryInfo> queryInfos = Sets.newTreeSet(Collections.reverseOrder()); + int startIndex = page < 1 ? page : ((page - 1) * size) + 1; int currentIndex = 0; + int skipSize = 0; ArrayUtils.reverse(files); for (FileStatus eachDateFile : files) { @@ -118,55 +116,60 @@ public class HistoryReader { } FSDataInputStream in = null; - long totalLength = 0; + List<String> jsonList = Lists.newArrayList(); try { in = fs.open(path); - while (totalLength < eachFile.getLen()) { + //If history file does not close, FileStatus.getLen() are not being updated + //So, this code block should check the EOFException + while (true) { int length = in.readInt(); - totalLength += 4; - - currentIndex++; - //skip previous page - if (startIndex >= currentIndex) { - totalLength += in.skipBytes(length); - continue; - } byte[] buf = new byte[length]; in.readFully(buf, 0, length); - totalLength += length; - String queryInfoJson = new String(buf, 0, length, Bytes.UTF8_CHARSET); - QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson); + jsonList.add(new String(buf, 0, length, Bytes.UTF8_CHARSET)); + currentIndex++; + } + } catch (EOFException eof) { + } catch (Throwable e) { + LOG.warn("Reading error:" + path + ", " + e.getMessage()); + } finally { + IOUtils.cleanup(LOG, in); + } + //skip previous page + if (startIndex > currentIndex) { + skipSize += jsonList.size(); + } else { + for (String json : jsonList) { + QueryInfo queryInfo = QueryInfo.fromJson(json); if (queryId != null) { if (queryInfo.getQueryId().equals(queryId)) { - queryInfos.add(queryInfo); - return queryInfos; + result.add(queryInfo); + return result; } } else { queryInfos.add(queryInfo); } } - } catch (Throwable e) { - LOG.warn("Reading error:" + path + ", " + e.getMessage()); - } finally { - IOUtils.cleanup(LOG, in); } - if (queryInfos.size() >= size) { - return queryInfos; + if (currentIndex - (startIndex - 1) >= size) { + result.addAll(queryInfos); + int fromIndex = (startIndex - 1) - skipSize; + return result.subList(fromIndex, fromIndex + size); } } } - return queryInfos; + result.addAll(queryInfos); + return result; } public QueryInfo getQueryByQueryId(QueryId queryId) throws IOException { - List<QueryInfo> queryInfoList = getQueryInfoInHistory(-1, Integer.MAX_VALUE, queryId); + List<QueryInfo> queryInfoList = findQueryInfoInStorage(-1, Integer.MAX_VALUE, queryId); if (queryInfoList.size() > 0) { return queryInfoList.get(0); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 7422a03..bcf7766 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -59,10 +59,8 @@ } } - List<QueryInfo> finishedQueries = new ArrayList<QueryInfo>( - master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize)); - Collections.sort(finishedQueries, java.util.Collections.reverseOrder()); - + List<QueryInfo> finishedQueries = + master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Map<Integer, NodeStatus> workers = master.getContext().getResourceManager().getNodes(); http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java index aee418c..3d2578c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java +++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java @@ -45,6 +45,7 @@ import static org.junit.Assert.*; public class TestHistoryWriterReader extends QueryTestCaseBase { public static final String HISTORY_DIR = "/tmp/tajo-test-history"; TajoConf tajoConf; + @Before public void setUp() throws Exception { tajoConf = new TajoConf(testingCluster.getConfiguration()); @@ -119,6 +120,69 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { } @Test + public void testQueryInfoPagination() throws Exception { + HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true); + try { + writer.init(tajoConf); + writer.start(); + + long startTime = System.currentTimeMillis(); + int testSize = 10; + QueryInfo queryInfo; + + for (int i = 1; i < testSize + 1; i++) { + queryInfo = new QueryInfo(QueryIdFactory.newQueryId(startTime, i)); + queryInfo.setStartTime(startTime); + queryInfo.setProgress(1.0f); + queryInfo.setQueryState(QueryState.QUERY_SUCCEEDED); + + if (testSize == i) { + writer.appendAndSync(queryInfo); + } else { + writer.appendHistory(queryInfo); + } + } + + SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); + Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR)); + + FileSystem fs = path.getFileSystem(tajoConf); + Path parentPath = new Path(path, df.format(startTime) + "/query-list"); + FileStatus[] histFiles = fs.listStatus(parentPath); + assertNotNull(histFiles); + assertEquals(1, histFiles.length); + assertTrue(histFiles[0].isFile()); + assertTrue(histFiles[0].getPath().getName().endsWith(".hist")); + + HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf); + List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, testSize); + assertNotNull(queryInfos); + assertEquals(testSize, queryInfos.size()); + + // the pagination api returns a descending ordered list + for (int i = 0; i < testSize; i++) { + assertEquals(testSize - i, queryInfos.get(i).getQueryId().getSeq()); + } + + int pages = 5; + int pageSize = testSize / pages; + int expectIdSequence = testSize; + //min startIndex of page is 1 + for (int i = 1; i < pages + 1; i++) { + queryInfos = reader.getQueriesInHistory(i, pageSize); + assertNotNull(queryInfos); + assertEquals(pageSize, queryInfos.size()); + + for (QueryInfo qInfo : queryInfos) { + assertEquals(expectIdSequence--, qInfo.getQueryId().getSeq()); + } + } + } finally { + writer.stop(); + } + } + + @Test public void testQueryHistoryReadAndWrite() throws Exception { HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true); writer.init(tajoConf);
