TAJO-1289: History reader fails to get the query information after a successful query execution. (jinho)
Closes #356 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a15b5fab Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a15b5fab Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a15b5fab Branch: refs/heads/index_support Commit: a15b5fab7b1475f5cb4e5eba842f1c4b17166b58 Parents: 17c6dff Author: jhkim <[email protected]> Authored: Fri Jan 23 15:40:07 2015 +0900 Committer: jhkim <[email protected]> Committed: Fri Jan 23 15:40:07 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/conf/TajoConf.java | 2 + .../main/java/org/apache/tajo/util/Bytes.java | 6 +- .../org/apache/tajo/master/QueryInProgress.java | 6 +- .../java/org/apache/tajo/master/QueryInfo.java | 7 +- .../org/apache/tajo/master/QueryManager.java | 25 +- .../apache/tajo/querymaster/QueryMaster.java | 8 +- .../apache/tajo/util/history/HistoryReader.java | 33 +- .../apache/tajo/util/history/HistoryWriter.java | 361 ++++++++++++++----- .../apache/tajo/worker/TaskRunnerManager.java | 1 + .../org/apache/tajo/client/TestTajoClient.java | 16 +- .../util/history/TestHistoryWriterReader.java | 19 +- 12 files changed, 335 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4e14c93..6969bf1 100644 --- a/CHANGES +++ b/CHANGES @@ -171,6 +171,9 @@ Release 0.10.0 - unreleased BUG FIXES + TAJO-1289: History reader fails to get the query information after + a successful query execution. (jinho) + TAJO-1303: CDH cannot pass hadoop version check test. (Keuntae Park via jihun) http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 74a9271..195743a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -268,6 +268,8 @@ public class TajoConf extends Configuration { HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"), HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"), HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7), + HISTORY_QUERY_REPLICATION("tajo.history.query.replication", 1, Validators.min("1")), + HISTORY_TASK_REPLICATION("tajo.history.task.replication", 1, Validators.min("1")), // Misc ------------------------------------------------------------------- // Fragment http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java index 405ec2f..8191cb6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java @@ -55,14 +55,14 @@ import static com.google.common.base.Preconditions.*; public class Bytes { //HConstants.UTF8_ENCODING should be updated if this changed /** When we encode strings, we always specify UTF8 encoding */ - private static final String UTF8_ENCODING = "UTF-8"; + public static final String UTF8_ENCODING = "UTF-8"; //HConstants.UTF8_CHARSET should be updated if this changed /** When we encode strings, we always specify UTF8 encoding */ - private static final Charset UTF8_CHARSET = Charset.forName(UTF8_ENCODING); + public static final Charset UTF8_CHARSET = Charset.forName(UTF8_ENCODING); //HConstants.EMPTY_BYTE_ARRAY should be updated if this changed - private static final byte [] EMPTY_BYTE_ARRAY = new byte [0]; + public static final byte [] EMPTY_BYTE_ARRAY = new byte [0]; private static final Log LOG = LogFactory.getLog(Bytes.class); http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index df461c8..7e2c05f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -103,7 +103,11 @@ public class QueryInProgress { RpcConnectionPool.getPool().closeConnection(queryMasterRpc); } - masterContext.getHistoryWriter().appendHistory(queryInfo); + try { + masterContext.getHistoryWriter().appendAndFlush(queryInfo); + } catch (Throwable e) { + LOG.warn(e); + } } public boolean startQueryMaster() { http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java index f902081..b11fd99 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java @@ -30,7 +30,7 @@ import org.apache.tajo.json.GsonObject; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.History; -public class QueryInfo implements GsonObject, History { +public class QueryInfo implements GsonObject, History, Comparable<QueryInfo> { private QueryId queryId; @Expose private QueryContext context; @@ -232,4 +232,9 @@ public class QueryInfo implements GsonObject, History { return builder.build(); } + + @Override + public int compareTo(QueryInfo o) { + return queryId.compareTo(o.queryId); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index eebefa7..bc6f07b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -20,6 +20,8 @@ package org.apache.tajo.master; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.collections.map.LRUMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,10 +38,12 @@ import org.apache.tajo.master.scheduler.SimpleFifoScheduler; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; +import org.apache.tajo.util.history.HistoryReader; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; /** @@ -59,6 +63,7 @@ public class QueryManager extends CompositeService { private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap(); private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap(); + private final LRUMap historyCache = new LRUMap(HistoryReader.DEFAULT_PAGE_SIZE); private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE); private AtomicLong maxExecutionTime = new AtomicLong(); @@ -121,19 +126,27 @@ public class QueryManager extends CompositeService { public synchronized Collection<QueryInfo> getFinishedQueries() { try { - return this.masterContext.getHistoryReader().getQueries(null); + Set<QueryInfo> result = Sets.newTreeSet(); + result.addAll(this.masterContext.getHistoryReader().getQueries(null)); + synchronized (historyCache) { + result.addAll(historyCache.values()); + } + return result; } catch (Throwable e) { LOG.error(e); return Lists.newArrayList(); } } - public synchronized QueryInfo getFinishedQuery(QueryId queryId) { try { - return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); + QueryInfo queryInfo = (QueryInfo) historyCache.get(queryId); + if (queryInfo == null) { + queryInfo = this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); + } + return queryInfo; } catch (Throwable e) { - LOG.error(e); + LOG.error(e.getMessage(), e); return null; } } @@ -235,6 +248,10 @@ public class QueryManager extends CompositeService { } QueryInfo queryInfo = queryInProgress.getQueryInfo(); + synchronized (historyCache) { + historyCache.put(queryInfo.getQueryId(), queryInfo); + } + long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime(); if (executionTime < minExecutionTime.get()) { minExecutionTime.set(executionTime); http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index be78fc3..1496b62 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -395,8 +395,12 @@ public class QueryMaster extends CompositeService implements EventHandler { if (query != null) { QueryHistory queryHisory = query.getQueryHistory(); if (queryHisory != null) { - query.context.getQueryMasterContext().getWorkerContext(). - getTaskHistoryWriter().appendHistory(queryHisory); + try { + query.context.getQueryMasterContext().getWorkerContext(). + getTaskHistoryWriter().appendAndFlush(queryHisory); + } catch (Throwable e) { + LOG.warn(e); + } } } if(workerContext.isYarnContainerMode()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java index c3f0087..f4719b2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -24,10 +24,12 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.util.Bytes; import java.io.EOFException; import java.io.IOException; @@ -56,30 +58,33 @@ public class HistoryReader { List<QueryInfo> queryInfos = new ArrayList<QueryInfo>(); FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf); - if (!fs.exists(historyParentPath)) { + try { + if (!fs.exists(historyParentPath)) { + return queryInfos; + } + } catch (Throwable e){ return queryInfos; } + FileStatus[] files = fs.listStatus(historyParentPath); if (files == null || files.length == 0) { return queryInfos; } for (FileStatus eachDateFile: files) { - if (eachDateFile.isFile()) { + Path queryListPath = new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST); + if (eachDateFile.isFile() || !fs.exists(queryListPath)) { continue; } - FileStatus[] dateFiles = fs.listStatus(new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST)); + + FileStatus[] dateFiles = fs.listStatus(queryListPath); if (dateFiles == null || dateFiles.length == 0) { continue; } for (FileStatus eachFile: dateFiles) { - if (eachFile.isDirectory()) { - continue; - } - Path path = eachFile.getPath(); - if (!path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX)) { + if (eachFile.isDirectory() || !path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX)) { continue; } @@ -94,7 +99,7 @@ public class HistoryReader { buf = new byte[length]; } in.readFully(buf, 0, length); - String queryInfoJson = new String(buf, 0, length); + String queryInfoJson = new String(buf, 0, length, Bytes.UTF8_CHARSET); QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson); if (keyword != null) { if (queryInfo.getSql().indexOf(keyword) >= 0) { @@ -105,10 +110,10 @@ public class HistoryReader { } } } catch (EOFException e) { - } catch (Exception e) { - LOG.error("Reading error:" + path + ", " +e.getMessage(), e); + } catch (Throwable e) { + LOG.warn("Reading error:" + path + ", " +e.getMessage()); } finally { - in.close(); + IOUtils.cleanup(LOG, in); } } } @@ -178,7 +183,7 @@ public class HistoryReader { in.readFully(buf, 0, buf.length); - return QueryHistory.fromJson(new String(buf)); + return QueryHistory.fromJson(new String(buf, Bytes.UTF8_CHARSET)); } finally { if (in != null) { in.close(); @@ -210,7 +215,7 @@ public class HistoryReader { in.readFully(buf, 0, buf.length); - return StageHistory.fromJsonTasks(new String(buf)); + return StageHistory.fromJsonTasks(new String(buf, Bytes.UTF8_CHARSET)); } finally { if (in != null) { in.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index 9eb58da..3fea3ef 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -18,19 +18,26 @@ package org.apache.tajo.util.history; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.util.Bytes; import org.apache.tajo.worker.TaskHistory; +import java.io.Closeable; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -50,7 +57,8 @@ public class HistoryWriter extends AbstractService { public static final String QUERY_DETAIL = "query-detail"; public static final String HISTORY_FILE_POSTFIX = ".hist"; - private final LinkedBlockingQueue<History> historyQueue = new LinkedBlockingQueue<History>(); + private final LinkedBlockingQueue<WriterFuture<WriterHolder>> + historyQueue = new LinkedBlockingQueue<WriterFuture<WriterHolder>>(); // key: yyyyMMddHH private Map<String, WriterHolder> taskWriters = new HashMap<String, WriterHolder>(); @@ -65,6 +73,8 @@ public class HistoryWriter extends AbstractService { private TajoConf tajoConf; private HistoryCleaner historyCleaner; private boolean isMaster; + private short queryReplication; + private short taskReplication; public HistoryWriter(String processName, boolean isMaster) { super(HistoryWriter.class.getName() + ":" + processName); @@ -79,31 +89,25 @@ public class HistoryWriter extends AbstractService { taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf); writerThread = new WriterThread(); historyCleaner = new HistoryCleaner(tajoConf, isMaster); + queryReplication = (short) tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_REPLICATION); + taskReplication = (short) tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_TASK_REPLICATION); super.serviceInit(conf); } @Override public void serviceStop() throws Exception { + if(stopped.getAndSet(true)){ + return; + } + for (WriterHolder eachWriter : taskWriters.values()) { - if (eachWriter.out != null) { - try { - eachWriter.out.close(); - } catch (Exception err) { - LOG.error(err.getMessage(), err); - } - } + IOUtils.cleanup(LOG, eachWriter); } + taskWriters.clear(); - stopped.set(true); writerThread.interrupt(); - if (querySummaryWriter != null && querySummaryWriter.out != null) { - try { - querySummaryWriter.out.close(); - } catch (Exception err) { - LOG.error(err.getMessage(), err); - } - } + IOUtils.cleanup(LOG, querySummaryWriter); if (historyCleaner != null) { historyCleaner.doStop(); @@ -117,10 +121,53 @@ public class HistoryWriter extends AbstractService { historyCleaner.start(); } - public void appendHistory(History history) { - synchronized (historyQueue) { - historyQueue.add(history); - historyQueue.notifyAll(); + /* asynchronously append to history file */ + public WriterFuture<WriterHolder> appendHistory(History history) { + WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history); + historyQueue.add(future); + return future; + } + + /* asynchronously flush to history file */ + public synchronized WriterFuture<WriterHolder> appendAndFlush(History history) { + WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history) { + public void done(WriterHolder holder) { + try { + if (holder != null) holder.flush(); + super.done(holder); + } catch (IOException e) { + super.failed(e); + } + } + }; + historyQueue.add(future); + synchronized (writerThread) { + writerThread.notifyAll(); + } + return future; + } + + /* synchronously flush to history file */ + public synchronized void appendAndSync(History history) + throws TimeoutException, InterruptedException, IOException { + + WriterFuture<WriterHolder> future = appendAndFlush(history); + + future.get(5, TimeUnit.SECONDS); + if(!future.isSucceed()){ + throw new IOException(future.getError()); + } + } + + /* Flushing the buffer */ + public synchronized void flushTaskHistories() { + if (historyQueue.size() > 0) { + synchronized (writerThread) { + writerThread.needTaskFlush.set(true); + writerThread.notifyAll(); + } + } else { + writerThread.flushTaskHistories(); } } @@ -146,7 +193,7 @@ public class HistoryWriter extends AbstractService { public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId) { SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); - Path datePath = null; + Path datePath; try { String[] tokens = queryId.split("_"); //q_1412483083972_0005 = q_<timestamp>_<seq> @@ -162,31 +209,27 @@ public class HistoryWriter extends AbstractService { } class WriterThread extends Thread { + private AtomicBoolean needTaskFlush = new AtomicBoolean(false); + public void run() { - LOG.info("HistoryWriter_"+ processName + " started."); + LOG.info("HistoryWriter_" + processName + " started."); SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); while (!stopped.get()) { - List<History> histories = new ArrayList<History>(); - synchronized (historyQueue) { - historyQueue.drainTo(histories); - if (histories.isEmpty()) { - try { - historyQueue.wait(60 * 1000); - } catch (InterruptedException e) { - if (stopped.get()) { - break; - } - } - } - } - if (stopped.get()) { - break; + List<WriterFuture<WriterHolder>> histories = Lists.newArrayList(); + + try { + drainHistory(histories, 100, 1000); + } catch (InterruptedException e) { + if (stopped.get()) break; } + try { if (!histories.isEmpty()) { writeHistory(histories); + } else { + continue; } - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } @@ -207,60 +250,91 @@ public class HistoryWriter extends AbstractService { } for (String eachWriterTime : closingTargets) { - WriterHolder writerHolder = null; + WriterHolder writerHolder; synchronized (taskWriters) { writerHolder = taskWriters.remove(eachWriterTime); } + if (writerHolder != null) { LOG.info("Closing task history file: " + writerHolder.path); - if (writerHolder.out != null) { - try { - writerHolder.out.close(); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } + IOUtils.cleanup(LOG, writerHolder); + } + } + } + } + LOG.info("HistoryWriter_" + processName + " stopped."); + } + + private int drainHistory(Collection<WriterFuture<WriterHolder>> buffer, int numElements, + long timeoutMillis) throws InterruptedException { + + long deadline = System.currentTimeMillis() + timeoutMillis; + int added = 0; + while (added < numElements) { + added += historyQueue.drainTo(buffer, numElements - added); + if (added < numElements) { // not enough elements immediately available; will have to wait + if (deadline <= System.currentTimeMillis()) { + break; + } else { + synchronized (writerThread) { + writerThread.wait(deadline - System.currentTimeMillis()); + if (deadline > System.currentTimeMillis()) { + added += historyQueue.drainTo(buffer, numElements - added); + break; } } } } } - LOG.info("HistoryWriter_"+ processName + " stopped."); + return added; } - public void writeHistory(List<History> histories) { + private List<WriterFuture<WriterHolder>> writeHistory(List<WriterFuture<WriterHolder>> histories) { + if (histories.isEmpty()) { - return; + return histories; } - for (History eachHistory : histories) { - switch(eachHistory.getHistoryType()) { + + for (WriterFuture<WriterHolder> future : histories) { + History history = future.getHistory(); + switch (history.getHistoryType()) { case TASK: try { - writeTaskHistory((TaskHistory) eachHistory); - } catch (Exception e) { + future.done(writeTaskHistory((TaskHistory) history)); + } catch (Throwable e) { LOG.error("Error while saving task history: " + - ((TaskHistory) eachHistory).getTaskAttemptId() + ":" + e.getMessage(), e); + ((TaskHistory) history).getTaskAttemptId() + ":" + e.getMessage(), e); + future.failed(e); } break; case QUERY: try { - writeQueryHistory((QueryHistory) eachHistory); - } catch (Exception e) { + writeQueryHistory((QueryHistory) history); + future.done(null); + } catch (Throwable e) { LOG.error("Error while saving query history: " + - ((QueryHistory) eachHistory).getQueryId() + ":" + e.getMessage(), e); + ((QueryHistory) history).getQueryId() + ":" + e.getMessage(), e); + future.failed(e); } break; case QUERY_SUMMARY: try { - writeQuerySummary((QueryInfo) eachHistory); - } catch (Exception e) { + future.done(writeQuerySummary((QueryInfo) history)); + } catch (Throwable e) { LOG.error("Error while saving query summary: " + - ((QueryInfo) eachHistory).getQueryId() + ":" + e.getMessage(), e); + ((QueryInfo) history).getQueryId() + ":" + e.getMessage(), e); + future.failed(e); } break; default: - LOG.warn("Wrong history type: " + eachHistory.getHistoryType()); + LOG.warn("Wrong history type: " + history.getHistoryType()); } } + + if(needTaskFlush.getAndSet(false)){ + flushTaskHistories(); + } + return histories; } private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Exception { @@ -283,16 +357,10 @@ public class HistoryWriter extends AbstractService { FSDataOutputStream out = null; try { LOG.info("Saving query summary: " + queryHistoryFile); - out = fs.create(queryHistoryFile); - out.write(queryHistory.toJson().getBytes()); + out = fs.create(queryHistoryFile, queryReplication); + out.write(queryHistory.toJson().getBytes(Bytes.UTF8_CHARSET)); } finally { - if (out != null) { - try { - out.close(); - } catch (Exception err) { - LOG.error(err.getMessage(), err); - } - } + IOUtils.cleanup(LOG, out); } if (queryHistory.getStageHistories() != null) { @@ -300,24 +368,18 @@ public class HistoryWriter extends AbstractService { Path path = new Path(queryHistoryFile.getParent(), stageHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX); out = null; try { - out = fs.create(path); - out.write(stageHistory.toTasksJson().getBytes()); + out = fs.create(path, queryReplication); + out.write(stageHistory.toTasksJson().getBytes(Bytes.UTF8_CHARSET)); LOG.info("Saving query unit: " + path); } finally { - if (out != null) { - try { - out.close(); - } catch (Exception err) { - LOG.error(err.getMessage(), err); - } - } + IOUtils.cleanup(LOG, out); } } } } - private synchronized void writeQuerySummary(QueryInfo queryInfo) throws Exception { - if(stopped.get()) return; + private synchronized WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception { + if(stopped.get()) return null; // writing to HDFS and rolling hourly if (querySummaryWriter == null) { @@ -327,18 +389,21 @@ public class HistoryWriter extends AbstractService { if (querySummaryWriter.out == null) { rollingQuerySummaryWriter(); } else if (System.currentTimeMillis() - querySummaryWriter.lastWritingTime >= 60 * 60 * 1000) { - if (querySummaryWriter.out != null) { - LOG.info("Close query history file: " + querySummaryWriter.path); - querySummaryWriter.out.close(); - } + LOG.info("Close query history file: " + querySummaryWriter.path); + IOUtils.cleanup(LOG, querySummaryWriter); rollingQuerySummaryWriter(); } } - byte[] jsonBytes = ("\n" + queryInfo.toJson() + "\n").getBytes(); - - querySummaryWriter.out.writeInt(jsonBytes.length); - querySummaryWriter.out.write(jsonBytes); - querySummaryWriter.out.hflush(); + byte[] jsonBytes = ("\n" + queryInfo.toJson() + "\n").getBytes(Bytes.UTF8_CHARSET); + try { + querySummaryWriter.out.writeInt(jsonBytes.length); + querySummaryWriter.out.write(jsonBytes); + } catch (IOException ie) { + IOUtils.cleanup(LOG, querySummaryWriter); + querySummaryWriter.out = null; + throw ie; + } + return querySummaryWriter; } private synchronized void rollingQuerySummaryWriter() throws Exception { @@ -359,10 +424,22 @@ public class HistoryWriter extends AbstractService { querySummaryWriter.path = historyFile; querySummaryWriter.lastWritingTime = System.currentTimeMillis(); LOG.info("Create query history file: " + historyFile); - querySummaryWriter.out = fs.create(historyFile); + querySummaryWriter.out = fs.create(historyFile, queryReplication); + } + + private void flushTaskHistories() { + synchronized (taskWriters) { + for (WriterHolder holder : taskWriters.values()) { + try { + holder.flush(); + } catch (IOException e) { + LOG.warn(e); + } + } + } } - private synchronized void writeTaskHistory(TaskHistory taskHistory) throws Exception { + private synchronized WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception { SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); String taskStartTime = df.format(new Date(taskHistory.getStartTime())); @@ -378,11 +455,17 @@ public class HistoryWriter extends AbstractService { writerHolder.lastWritingTime = System.currentTimeMillis(); if (writerHolder.out != null) { - byte[] taskHistoryBytes = taskHistory.getProto().toByteArray(); - writerHolder.out.writeInt(taskHistoryBytes.length); - writerHolder.out.write(taskHistoryBytes); - writerHolder.out.flush(); + try { + byte[] taskHistoryBytes = taskHistory.getProto().toByteArray(); + writerHolder.out.writeInt(taskHistoryBytes.length); + writerHolder.out.write(taskHistoryBytes); + } catch (IOException ie) { + taskWriters.remove(taskStartTime); + IOUtils.cleanup(LOG, writerHolder); + throw ie; + } } + return writerHolder; } private FSDataOutputStream createTaskHistoryFile(String taskStartTime, WriterHolder writerHolder) throws IOException { @@ -395,7 +478,7 @@ public class HistoryWriter extends AbstractService { } } writerHolder.path = path; - return fs.create(path, false); + return fs.create(path, false, 4096, taskReplication, fs.getDefaultBlockSize(path)); } } @@ -444,9 +527,89 @@ public class HistoryWriter extends AbstractService { return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX); } - class WriterHolder { + static class WriterHolder implements Closeable { long lastWritingTime; Path path; FSDataOutputStream out; + + @Override + public synchronized void close() throws IOException { + if (out != null) out.close(); + } + + /* + * Sync buffered data to DataNodes or disks (flush to disk devices). + */ + private synchronized void flush() throws IOException { + if (out != null) out.hsync(); + } + } + + static class WriterFuture<T> implements Future<T> { + private boolean done = false; + private T result; + private History history; + private Throwable error; + private CountDownLatch latch = new CountDownLatch(1); + + public WriterFuture(History history) { + this.history = history; + } + + private History getHistory() { + return history; + } + + public void done(T t) { + this.result = t; + this.done = true; + this.latch.countDown(); + } + + public void failed(Throwable e) { + this.error = e; + done(null); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + // TODO - to be implemented + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCancelled() { + // TODO - to be implemented + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + return done; + } + + public boolean isSucceed() { + return error == null; + } + + public Throwable getError() { + return error; + } + + @Override + public T get() throws InterruptedException { + this.latch.await(); + return result; + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException { + if (latch.await(timeout, unit)) { + return result; + } else { + throw new TimeoutException(); + } + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index f837b11..3c1fcc5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -192,6 +192,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId()); executionBlockContext.reportExecutionBlock(event.getExecutionBlockId()); workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId()); + workerContext.getTaskHistoryWriter().flushTaskHistories(); } catch (IOException e) { LOG.fatal(e.getMessage(), e); throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java index 113288a..63ea8b9 100644 --- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -638,9 +638,8 @@ public class TestTajoClient { QueryId queryId = new QueryId(response.getQueryId()); try { - long startTime = System.currentTimeMillis(); while (true) { - Thread.sleep(5 * 1000); + Thread.sleep(100); List<ClientProtos.BriefQueryInfo> finishedQueries = client.getFinishedQueryList(); boolean finished = false; @@ -656,9 +655,6 @@ public class TestTajoClient { if (finished) { break; } - if(System.currentTimeMillis() - startTime > 20 * 1000) { - fail("Too long time execution query"); - } } QueryStatus queryStatus = client.getQueryStatus(queryId); @@ -755,7 +751,7 @@ public class TestTajoClient { assertEquals(expected, resultDatas); } - @Test + @Test(timeout = 30000) public void testGetQueryInfoAndHistory() throws Exception { String sql = "select count(*) from lineitem"; ClientProtos.SubmitQueryResponse response = client.executeQuery(sql); @@ -763,8 +759,7 @@ public class TestTajoClient { assertNotNull(response); QueryId queryId = new QueryId(response.getQueryId()); - QueryInfoProto queryInfo = null; - long startTime = System.currentTimeMillis(); + QueryInfoProto queryInfo; while (true) { queryInfo = client.getQueryInfo(queryId); @@ -772,12 +767,7 @@ public class TestTajoClient { break; } Thread.sleep(100); - - if (System.currentTimeMillis() - startTime > 30 * 1000) { - fail("Too long running query"); - } } - Thread.sleep(5 * 1000); assertNotNull(queryInfo); assertEquals(queryId.toString(), queryInfo.getQueryId()); http://git-wip-us.apache.org/repos/asf/tajo/blob/a15b5fab/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java index 45282aa..f442bde 100644 --- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java +++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java @@ -41,7 +41,6 @@ import java.util.Date; import java.util.List; import static org.junit.Assert.*; -import static org.junit.Assert.assertTrue; public class TestHistoryWriterReader extends QueryTestCaseBase { public static final String HISTORY_DIR = "/tmp/tajo-test-history"; @@ -71,17 +70,13 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { queryInfo1.setStartTime(startTime); queryInfo1.setProgress(1.0f); queryInfo1.setQueryState(QueryState.QUERY_SUCCEEDED); + writer.appendHistory(queryInfo1); QueryInfo queryInfo2 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 2)); queryInfo2.setStartTime(startTime); queryInfo2.setProgress(0.5f); queryInfo2.setQueryState(QueryState.QUERY_FAILED); - - writer.appendHistory(queryInfo1); - writer.appendHistory(queryInfo2); - - // HistoryWriter writes asynchronous. - Thread.sleep(5 * 1000); + writer.appendAndSync(queryInfo2); SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR)); @@ -144,10 +139,7 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { } queryHistory.setStageHistories(stages); - writer.appendHistory(queryHistory); - - // HistoryWriter writes asynchronous. - Thread.sleep(5 * 1000); + writer.appendAndSync(queryHistory); SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR)); @@ -217,10 +209,7 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00"); org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory( id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats); - writer.appendHistory(taskHistory2); - - // HistoryWriter writes asynchronous. - Thread.sleep(5 * 1000); + writer.appendAndSync(taskHistory2); SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); String startDate = df.format(new Date(startTime));
