TAJO-1026: Implement Query history persistency manager. Closes #179
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e01b00a7 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e01b00a7 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e01b00a7 Branch: refs/heads/master Commit: e01b00a7b05b98cc407ced04dad1c2f2144d0f88 Parents: a3e5bdd Author: HyoungJun Kim <[email protected]> Authored: Tue Nov 11 10:30:22 2014 +0900 Committer: HyoungJun Kim <[email protected]> Committed: Tue Nov 11 10:30:22 2014 +0900 ---------------------------------------------------------------------- CHANGES | 4 +- .../org/apache/tajo/client/QueryClient.java | 6 + .../org/apache/tajo/client/QueryClientImpl.java | 52 +++ .../org/apache/tajo/client/TajoClientImpl.java | 13 +- tajo-client/src/main/proto/ClientProtos.proto | 60 +++ .../main/proto/QueryMasterClientProtocol.proto | 1 + .../main/proto/TajoMasterClientProtocol.proto | 1 + .../java/org/apache/tajo/conf/TajoConf.java | 26 ++ .../java/org/apache/tajo/master/TajoMaster.java | 21 + .../tajo/master/TajoMasterClientService.java | 33 ++ .../apache/tajo/master/querymaster/Query.java | 39 ++ .../master/querymaster/QueryInProgress.java | 3 + .../tajo/master/querymaster/QueryInfo.java | 79 +++- .../tajo/master/querymaster/QueryMaster.java | 89 ++-- .../tajo/master/querymaster/QueryUnit.java | 79 ++++ .../tajo/master/querymaster/SubQuery.java | 76 ++++ .../main/java/org/apache/tajo/util/JSPUtil.java | 162 ++++++- .../org/apache/tajo/util/history/History.java | 27 ++ .../tajo/util/history/HistoryCleaner.java | 136 ++++++ .../apache/tajo/util/history/HistoryReader.java | 308 +++++++++++++ .../apache/tajo/util/history/HistoryWriter.java | 450 +++++++++++++++++++ .../apache/tajo/util/history/QueryHistory.java | 151 +++++++ .../tajo/util/history/QueryUnitHistory.java | 167 +++++++ .../tajo/util/history/SubQueryHistory.java | 270 +++++++++++ .../java/org/apache/tajo/worker/TajoWorker.java | 20 + .../tajo/worker/TajoWorkerClientService.java | 34 ++ .../main/java/org/apache/tajo/worker/Task.java | 5 +- .../org/apache/tajo/worker/TaskHistory.java | 8 +- .../src/main/resources/webapps/admin/query.jsp | 72 +-- .../resources/webapps/admin/querydetail.jsp | 116 +++++ .../main/resources/webapps/admin/querytasks.jsp | 249 ++++++++++ .../main/resources/webapps/admin/queryunit.jsp | 134 ++++++ .../resources/webapps/worker/querydetail.jsp | 43 +- .../resources/webapps/worker/querytasks.jsp | 126 ++++-- .../resources/webapps/worker/taskhistory.jsp | 123 +++++ .../org/apache/tajo/client/TestTajoClient.java | 54 +++ .../java/org/apache/tajo/util/TestJSPUtil.java | 37 +- .../util/history/TestHistoryWriterReader.java | 251 +++++++++++ 38 files changed, 3384 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 665313b..6c53920 100644 --- a/CHANGES +++ b/CHANGES @@ -5,12 +5,14 @@ Release 0.9.1 - unreleased NEW FEATURES + TAJO-1026: Implement Query history persistency manager.(Hyoungjun Kim) + TAJO-233: Support PostgreSQL CatalogStore. (Jihun Kang via hyunsik) TAJO-235: Support Oracle CatalogStore. (Jihun Kang via hyunsik) IMPROVEMENT - + TAJO-1133: Add 'bin/tajo version' command. (Jihun Kang via hyunsik) TAJO-1145: Add 'bin/tajo --help' command. (Jihun Kang via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java index 59ef52b..9b24663 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java @@ -22,6 +22,8 @@ import com.google.protobuf.ServiceException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tajo.QueryId; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto; +import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; import org.apache.tajo.jdbc.TajoMemoryResultSet; import java.io.Closeable; @@ -111,4 +113,8 @@ public interface QueryClient extends Closeable { public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException; public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException; + + public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException; + + public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index f92c9bf..5b78959 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -618,4 +618,56 @@ public class QueryClientImpl implements QueryClient { } return status; } + + public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { + return new ServerCallable<QueryInfoProto>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + public QueryInfoProto call(NettyClientBase client) throws ServiceException { + connection.checkSessionAndGet(client); + + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); + if (res.getResultCode() == ResultCode.OK) { + return res.getQueryInfo(); + } else { + abort(); + throw new ServiceException(res.getErrorMessage()); + } + } + }.withRetries(); + } + + public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException { + final QueryInfoProto queryInfo = getQueryInfo(queryId); + + if (queryInfo.getHostNameOfQM() == null || queryInfo.getQueryMasterClientPort() == 0) { + return null; + } + InetSocketAddress qmAddress = new InetSocketAddress( + queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort()); + + return new ServerCallable<QueryHistoryProto>(connection.connPool, qmAddress, + QueryMasterClientProtocol.class, false, true) { + public QueryHistoryProto call(NettyClientBase client) throws ServiceException { + connection.checkSessionAndGet(client); + + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + + QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); + GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build()); + if (res.getResultCode() == ResultCode.OK) { + return res.getQueryHistory(); + } else { + abort(); + throw new ServiceException(res.getErrorMessage()); + } + } + }.withRetries(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index 75de492..1d637ed 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -32,10 +32,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; -import org.apache.tajo.ipc.ClientProtos.GetQueryResultResponse; -import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; -import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; +import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.util.NetUtils; @@ -158,6 +155,14 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que return queryClient.getClusterInfo(); } + public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { + return queryClient.getQueryInfo(queryId); + } + + public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException { + return queryClient.getQueryHistory(queryId); + } + /*------------------------------------------------------------------------*/ // CatalogClient wrappers /*------------------------------------------------------------------------*/ http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 23ae6dd..4118458 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -234,3 +234,63 @@ message FunctionResponse { repeated FunctionDescProto functions = 2; optional string errorMessage = 3; } + +message QueryInfoProto { + required string queryId = 1; + optional string sql = 2; + optional QueryState queryState = 3; + optional float progress = 4; + optional int64 startTime = 5; + optional int64 finishTime = 6; + optional string lastMessage = 7; + optional string hostNameOfQM = 8; + optional int32 queryMasterPort = 9; + optional int32 queryMasterClientPort = 10; + optional int32 queryMasterInfoPort = 11; +} + +message SubQueryHistoryProto { + required string executionBlockId =1; + required string state = 2; + optional int64 startTime = 3; + optional int64 finishTime = 4; + optional int32 succeededObjectCount = 5; + optional int32 failedObjectCount = 6; + optional int32 killedObjectCount = 7; + optional int32 totalScheduledObjectsCount = 8; + + optional int64 totalInputBytes = 9; + optional int64 totalReadBytes = 10; + optional int64 totalReadRows = 11; + optional int64 totalWriteBytes = 12; + optional int64 totalWriteRows = 13; + optional int32 numShuffles = 14; + optional float progress =15; + + optional string plan = 16; + optional int32 hostLocalAssigned = 17; + optional int32 rackLocalAssigned = 18; +} + +message QueryHistoryProto { + required string queryId = 1; + optional string queryMaster = 2; + optional int32 httpPort = 3; + optional string logicalPlan = 4; + optional string distributedPlan = 5; + repeated KeyValueProto sessionVariables = 6; + repeated SubQueryHistoryProto subQueryHistories = 7; +} + +message GetQueryHistoryResponse { + required ResultCode resultCode = 1; + optional QueryHistoryProto queryHistory = 2; + optional string errorMessage = 3; +} + +message GetQueryInfoResponse { + required ResultCode resultCode = 1; + optional QueryInfoProto queryInfo = 2; + optional string errorMessage = 3; +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/proto/QueryMasterClientProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto index 9d96505..3d8d70b 100644 --- a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto @@ -32,4 +32,5 @@ service QueryMasterClientProtocolService { rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse); rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse); rpc closeQuery(QueryIdProto) returns (BoolProto); + rpc getQueryHistory(QueryIdRequest) returns (GetQueryHistoryResponse); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-client/src/main/proto/TajoMasterClientProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto index 1afc069..bc59617 100644 --- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto @@ -51,6 +51,7 @@ service TajoMasterClientProtocolService { rpc killQuery(QueryIdRequest) returns (BoolProto); rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto); + rpc getQueryInfo(QueryIdRequest) returns (GetQueryInfoResponse); // Database Management APIs rpc createDatabase(SessionedStringProto) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 786aed0..3966410 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -296,6 +296,11 @@ public class TajoConf extends Configuration { // Metrics ---------------------------------------------------------------- METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties"), + // Query History --------------------------------------------------------- + HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"), + HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"), + HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7), + // Misc ------------------------------------------------------------------- // Geo IP @@ -678,6 +683,27 @@ public class TajoConf extends Configuration { return new Path(stagingDirString); } + public static Path getQueryHistoryDir(TajoConf conf) throws IOException { + String historyDirString = conf.getVar(ConfVars.HISTORY_QUERY_DIR); + if (!hasScheme(historyDirString)) { + Path stagingPath = getStagingDir(conf); + FileSystem fs = stagingPath.getFileSystem(conf); + Path path = new Path(fs.getUri().toString(), historyDirString); + conf.setVar(ConfVars.HISTORY_QUERY_DIR, path.toString()); + return path; + } + return new Path(historyDirString); + } + + public static Path getTaskHistoryDir(TajoConf conf) throws IOException { + String historyDirString = conf.getVar(ConfVars.HISTORY_TASK_DIR); + if (!hasScheme(historyDirString)) { + //Local dir + historyDirString = "file://" + historyDirString; + } + return new Path(historyDirString); + } + public static Path getSystemConfPath(TajoConf conf) { String systemConfPathStr = conf.getVar(ConfVars.SYSTEM_CONF_PATH); if (systemConfPathStr == null || systemConfPathStr.equals("")) { http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 25e1be5..17658ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -52,6 +52,8 @@ import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.*; import org.apache.tajo.util.metrics.TajoSystemMetrics; +import org.apache.tajo.util.history.HistoryReader; +import org.apache.tajo.util.history.HistoryWriter; import org.apache.tajo.webapp.QueryExecutorServlet; import org.apache.tajo.webapp.StaticHttpServer; @@ -125,6 +127,10 @@ public class TajoMaster extends CompositeService { private JvmPauseMonitor pauseMonitor; + private HistoryWriter historyWriter; + + private HistoryReader historyReader; + public TajoMaster() throws Exception { super(TajoMaster.class.getName()); } @@ -309,6 +315,13 @@ public class TajoMaster extends CompositeService { } catch (IOException e) { LOG.error(e.getMessage(), e); } + + historyWriter = new HistoryWriter(getMasterName(), true); + historyWriter.init(getConfig()); + addIfService(historyWriter); + historyWriter.start(); + + historyReader = new HistoryReader(getMasterName(), context.getConf()); } private void writeSystemConf() throws IOException { @@ -452,6 +465,14 @@ public class TajoMaster extends CompositeService { public HAService getHAService() { return haService; } + + public HistoryWriter getHistoryWriter() { + return historyWriter; + } + + public HistoryReader getHistoryReader() { + return historyReader; + } } String getThreadTaskName(long id, String name) { http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index a4688d9..2c81cd0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -57,6 +57,8 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.history.QueryHistory; import java.io.IOException; import java.net.InetSocketAddress; @@ -537,6 +539,37 @@ public class TajoMasterClientService extends AbstractService { } } + @Override + public GetQueryInfoResponse getQueryInfo(RpcController controller, QueryIdRequest request) throws ServiceException { + GetQueryInfoResponse.Builder builder = GetQueryInfoResponse.newBuilder(); + + try { + context.getSessionManager().touch(request.getSessionId().getId()); + QueryId queryId = new QueryId(request.getQueryId()); + + QueryJobManager queryJobManager = context.getQueryJobManager(); + QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId); + + QueryInfo queryInfo = null; + if (queryInProgress == null) { + queryInfo = context.getHistoryReader().getQueryInfo(queryId.toString()); + } else { + queryInfo = queryInProgress.getQueryInfo(); + } + + if (queryInfo != null) { + builder.setQueryInfo(queryInfo.getProto()); + } + builder.setResultCode(ResultCode.OK); + } catch (Throwable t) { + LOG.warn(t.getMessage(), t); + builder.setResultCode(ResultCode.ERROR); + builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t)); + } + + return builder.build(); + } + /** * It is invoked by TajoContainerProxy. */ http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index a16d36a..7114d39 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -52,6 +52,8 @@ import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.history.QueryHistory; +import org.apache.tajo.util.history.SubQueryHistory; import java.io.IOException; import java.text.NumberFormat; @@ -287,6 +289,42 @@ public class Query implements EventHandler<QueryEvent> { finishTime = clock.getTime(); } + public QueryHistory getQueryHistory() { + QueryHistory queryHistory = makeQueryHistory(); + queryHistory.setSubQueryHistories(makeSubQueryHistories()); + return queryHistory; + } + + private List<SubQueryHistory> makeSubQueryHistories() { + List<SubQueryHistory> subQueryHistories = new ArrayList<SubQueryHistory>(); + for(SubQuery eachSubQuery: getSubQueries()) { + subQueryHistories.add(eachSubQuery.getSubQueryHistory()); + } + + return subQueryHistories; + } + + private QueryHistory makeQueryHistory() { + QueryHistory queryHistory = new QueryHistory(); + + queryHistory.setQueryId(getId().toString()); + queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName()); + queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort()); + queryHistory.setLogicalPlan(plan.toString()); + queryHistory.setLogicalPlan(plan.getLogicalPlan().toString()); + queryHistory.setDistributedPlan(plan.toString()); + + List<String[]> sessionVariables = new ArrayList<String[]>(); + for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) { + if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) { + sessionVariables.add(new String[]{entry.getKey(), entry.getValue()}); + } + } + queryHistory.setSessionVariables(sessionVariables); + + return queryHistory; + } + public List<String> getDiagnostics() { readLock.lock(); try { @@ -385,6 +423,7 @@ public class Query implements EventHandler<QueryEvent> { } query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); query.setFinishTime(); + return finalState; } http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index 536778a..d949ca4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -146,6 +146,8 @@ public class QueryInProgress extends CompositeService { if(queryMasterRpc != null) { RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc); } + + masterContext.getHistoryWriter().appendHistory(queryInfo); super.stop(); } @@ -175,6 +177,7 @@ public class QueryInProgress extends CompositeService { queryInfo.setQueryMaster(resource.getConnectionInfo().getHost()); queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort()); queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); + queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort()); getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo)); http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java index 955c5b3..00b95ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java @@ -19,21 +19,41 @@ package org.apache.tajo.master.querymaster; +import com.google.gson.annotations.Expose; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; +import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; +import org.apache.tajo.json.GsonObject; +import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.util.history.History; -public class QueryInfo { +public class QueryInfo implements GsonObject, History { private QueryId queryId; + @Expose private String sql; - private String jsonExpr; + @Expose private TajoProtos.QueryState queryState; + @Expose private float progress; + @Expose private long startTime; + @Expose private long finishTime; + @Expose private String lastMessage; + @Expose private String hostNameOfQM; + @Expose private int queryMasterPort; + @Expose private int queryMasterClientPort; + @Expose + private int queryMasterInfoPort; + @Expose + private String queryIdStr; + + private String jsonExpr; public QueryInfo(QueryId queryId) { this(queryId, null, null); @@ -41,6 +61,7 @@ public class QueryInfo { public QueryInfo(QueryId queryId, String sql, String jsonExpr) { this.queryId = queryId; + this.queryIdStr = queryId.toString(); this.sql = sql; this.jsonExpr = jsonExpr; this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT; @@ -60,7 +81,14 @@ public class QueryInfo { public void setQueryMaster(String hostName) { this.hostNameOfQM = hostName; + } + + public int getQueryMasterInfoPort() { + return queryMasterInfoPort; + } + public void setQueryMasterInfoPort(int queryMasterInfoPort) { + this.queryMasterInfoPort = queryMasterInfoPort; } public void setQueryMasterPort(int port) { @@ -128,4 +156,51 @@ public class QueryInfo { public String getJsonExpr() { return jsonExpr; } + + @Override + public String toJson() { + return CoreGsonHelper.toJson(this, QueryInfo.class); + } + + @Override + public HistoryType getHistoryType() { + return HistoryType.QUERY_SUMMARY; + } + + public static QueryInfo fromJson(String json) { + QueryInfo queryInfo = CoreGsonHelper.fromJson(json, QueryInfo.class); + queryInfo.queryId = TajoIdUtils.parseQueryId(queryInfo.queryIdStr); + return queryInfo; + } + + public String getQueryIdStr() { + return queryIdStr; + } + + public QueryInfoProto getProto() { + QueryInfoProto.Builder builder = QueryInfoProto.newBuilder(); + + builder.setQueryId(queryId.toString()) + .setQueryState(queryState) + .setProgress(progress) + .setStartTime(startTime) + .setFinishTime(finishTime) + .setQueryMasterPort(queryMasterPort) + .setQueryMasterClientPort(queryMasterClientPort) + .setQueryMasterInfoPort(queryMasterInfoPort); + + if (sql != null) { + builder.setSql(sql); + } + + if (lastMessage != null) { + builder.setLastMessage(lastMessage); + } + + if (hostNameOfQM != null) { + builder.setHostNameOfQM(hostNameOfQM); + } + + return builder.build(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 7c3d799..b3b4dbb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -44,6 +44,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.worker.TajoWorker; import java.util.ArrayList; @@ -393,58 +394,64 @@ public class QueryMaster extends CompositeService implements EventHandler { } public void stopQuery(QueryId queryId) { - QueryMasterTask queryMasterTask; - queryMasterTask = queryMasterTasks.remove(queryId); - if(queryMasterTask == null) return; + QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId); + if(queryMasterTask == null) { + LOG.warn("No query info:" + queryId); + return; + } finishedQueryMasterTasks.put(queryId, queryMasterTask); - if(queryMasterTask != null) { - TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask); - CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>(); + TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask); + CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>(); - NettyClientBase tmClient = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { + NettyClientBase tmClient = null; + try { + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), TajoMasterProtocol.class, true); } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - masterClientService.heartbeat(future.getController(), queryHeartbeat, future); - } catch (Exception e) { - //this function will be closed in new thread. - //When tajo do stop cluster, tajo master maybe throw closed connection exception + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + masterClientService.heartbeat(future.getController(), queryHeartbeat, future); + } catch (Exception e) { + //this function will be closed in new thread. + //When tajo do stop cluster, tajo master maybe throw closed connection exception - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(tmClient); + } - try { - queryMasterTask.stop(); - if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) { - cleanup(queryId); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); + try { + queryMasterTask.stop(); + if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) { + cleanup(queryId); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + Query query = queryMasterTask.getQuery(); + if (query != null) { + QueryHistory queryHisory = query.getQueryHistory(); + if (queryHisory != null) { + query.context.getQueryMasterContext().getWorkerContext(). + getTaskHistoryWriter().appendHistory(queryHisory); } - } else { - LOG.warn("No query info:" + queryId); } if(workerContext.isYarnContainerMode()) { stop(); http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index fe2752f..0f275e9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -32,6 +32,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto; import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; @@ -42,8 +43,10 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.util.history.QueryUnitHistory; import org.apache.tajo.worker.FetchImpl; import java.net.URI; @@ -84,6 +87,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { private QueryUnitAttemptId successfulAttempt; private String succeededHost; + private int succeededHostPort; private int succeededPullServerPort; private int failedAttempts; @@ -96,6 +100,8 @@ public class QueryUnit implements EventHandler<TaskEvent> { private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); + private QueryUnitHistory finalQueryUnitHistory; + protected static final StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory = new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW) @@ -214,6 +220,76 @@ public class QueryUnit implements EventHandler<TaskEvent> { } } + public TaskAttemptState getLastAttemptStatus() { + QueryUnitAttempt lastAttempt = getLastAttempt(); + if (lastAttempt != null) { + return lastAttempt.getState(); + } else { + return TaskAttemptState.TA_ASSIGNED; + } + } + + public QueryUnitHistory getQueryUnitHistory() { + if (finalQueryUnitHistory != null) { + if (finalQueryUnitHistory.getFinishTime() == 0) { + finalQueryUnitHistory = makeQueryUnitHistory(); + } + return finalQueryUnitHistory; + } else { + return makeQueryUnitHistory(); + } + } + + private QueryUnitHistory makeQueryUnitHistory() { + QueryUnitHistory queryUnitHistory = new QueryUnitHistory(); + + QueryUnitAttempt lastAttempt = getLastAttempt(); + if (lastAttempt != null) { + queryUnitHistory.setId(lastAttempt.getId().toString()); + queryUnitHistory.setState(lastAttempt.getState().toString()); + queryUnitHistory.setProgress(lastAttempt.getProgress()); + } + queryUnitHistory.setHostAndPort(succeededHost + ":" + succeededHostPort); + queryUnitHistory.setRetryCount(this.getRetryCount()); + queryUnitHistory.setLaunchTime(launchTime); + queryUnitHistory.setFinishTime(finishTime); + + queryUnitHistory.setNumShuffles(getShuffleOutpuNum()); + if (!getShuffleFileOutputs().isEmpty()) { + ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0); + if (queryUnitHistory.getNumShuffles() > 0) { + queryUnitHistory.setShuffleKey("" + shuffleFileOutputs.getPartId()); + queryUnitHistory.setShuffleFileName(shuffleFileOutputs.getFileName()); + } + } + + List<String> fragmentList = new ArrayList<String>(); + for (FragmentProto eachFragment : getAllFragments()) { + FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment); + fragmentList.add(fileFragment.toString()); + } + queryUnitHistory.setFragments(fragmentList.toArray(new String[]{})); + + List<String[]> fetchList = new ArrayList<String[]>(); + for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) { + for (FetchImpl f : e.getValue()) { + for (URI uri : f.getSimpleURIs()){ + fetchList.add(new String[] {e.getKey(), uri.toString()}); + } + } + } + + queryUnitHistory.setFetchs(fetchList.toArray(new String[][]{})); + + List<String> dataLocationList = new ArrayList<String>(); + for(DataLocation eachLocation: getDataLocations()) { + dataLocationList.add(eachLocation.toString()); + } + + queryUnitHistory.setDataLocations(dataLocationList.toArray(new String[]{})); + return queryUnitHistory; + } + public void setLogicalPlan(LogicalNode plan) { this.plan = plan; @@ -488,6 +564,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { private void finishTask() { this.finishTime = System.currentTimeMillis(); + finalQueryUnitHistory = makeQueryUnitHistory(); } private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> { @@ -527,6 +604,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { task.successfulAttempt = attemptEvent.getTaskAttemptId(); task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); + task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort(); task.finishTask(); @@ -542,6 +620,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); task.launchTime = System.currentTimeMillis(); task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); + task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 91fd22d..96534df 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -64,6 +64,8 @@ import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.history.QueryUnitHistory; +import org.apache.tajo.util.history.SubQueryHistory; import org.apache.tajo.worker.FetchImpl; import java.io.IOException; @@ -280,6 +282,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private TaskSchedulerContext schedulerContext; private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>(); private AtomicInteger completeReportReceived = new AtomicInteger(0); + private SubQueryHistory finalSubQueryHistory; public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, StorageManager sm) { @@ -394,6 +397,76 @@ public class SubQuery implements EventHandler<SubQueryEvent> { tasks.put(task.getId(), task); } + public SubQueryHistory getSubQueryHistory() { + if (finalSubQueryHistory != null) { + if (finalSubQueryHistory.getFinishTime() == 0) { + finalSubQueryHistory = makeSubQueryHistory(); + finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories()); + } + return finalSubQueryHistory; + } else { + return makeSubQueryHistory(); + } + } + + private List<QueryUnitHistory> makeQueryUnitHistories() { + List<QueryUnitHistory> queryUnitHistories = new ArrayList<QueryUnitHistory>(); + + for(QueryUnit eachQueryUnit: getQueryUnits()) { + queryUnitHistories.add(eachQueryUnit.getQueryUnitHistory()); + } + + return queryUnitHistories; + } + + private SubQueryHistory makeSubQueryHistory() { + SubQueryHistory subQueryHistory = new SubQueryHistory(); + + subQueryHistory.setExecutionBlockId(getId().toString()); + subQueryHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan())); + subQueryHistory.setState(getState().toString()); + subQueryHistory.setStartTime(startTime); + subQueryHistory.setFinishTime(finishTime); + subQueryHistory.setSucceededObjectCount(succeededObjectCount); + subQueryHistory.setKilledObjectCount(killedObjectCount); + subQueryHistory.setFailedObjectCount(failedObjectCount); + subQueryHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount); + subQueryHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned()); + subQueryHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned()); + + long totalInputBytes = 0; + long totalReadBytes = 0; + long totalReadRows = 0; + long totalWriteBytes = 0; + long totalWriteRows = 0; + int numShuffles = 0; + for(QueryUnit eachQueryUnit: getQueryUnits()) { + numShuffles = eachQueryUnit.getShuffleOutpuNum(); + if (eachQueryUnit.getLastAttempt() != null) { + TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats(); + if (inputStats != null) { + totalInputBytes += inputStats.getNumBytes(); + totalReadBytes += inputStats.getReadBytes(); + totalReadRows += inputStats.getNumRows(); + } + TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats(); + if (outputStats != null) { + totalWriteBytes += outputStats.getNumBytes(); + totalWriteRows += outputStats.getNumRows(); + } + } + } + + subQueryHistory.setTotalInputBytes(totalInputBytes); + subQueryHistory.setTotalReadBytes(totalReadBytes); + subQueryHistory.setTotalReadRows(totalReadRows); + subQueryHistory.setTotalWriteBytes(totalWriteBytes); + subQueryHistory.setTotalWriteRows(totalWriteRows); + subQueryHistory.setNumShuffles(numShuffles); + subQueryHistory.setProgress(getProgress()); + return subQueryHistory; + } + /** * It finalizes this subquery. It is only invoked when the subquery is succeeded. */ @@ -1172,6 +1245,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> { getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds); } + + this.finalSubQueryHistory = makeSubQueryHistory(); + this.finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories()); } public List<IntermediateEntry> getHashShuffleIntermediateEntries() { http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 9d0dcaa..6d3597d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -23,10 +23,14 @@ import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.ha.HAService; import org.apache.tajo.master.querymaster.QueryInProgress; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.util.history.QueryUnitHistory; +import org.apache.tajo.util.history.SubQueryHistory; import org.apache.tajo.worker.TaskRunnerHistory; import org.apache.tajo.worker.TaskRunner; @@ -38,7 +42,7 @@ import static org.apache.tajo.conf.TajoConf.ConfVars; public class JSPUtil { static DecimalFormat decimalF = new DecimalFormat("###.0"); - public static void sortQueryUnit(QueryUnit[] queryUnits, String sortField, String sortOrder) { + public static void sortQueryUnitArray(QueryUnit[] queryUnits, String sortField, String sortOrder) { if(sortField == null || sortField.isEmpty()) { sortField = "id"; } @@ -46,6 +50,22 @@ public class JSPUtil { Arrays.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder))); } + public static void sortQueryUnit(List<QueryUnit> queryUnits, String sortField, String sortOrder) { + if(sortField == null || sortField.isEmpty()) { + sortField = "id"; + } + + Collections.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder))); + } + + public static void sortQueryUnitHistory(List<QueryUnitHistory> queryUnits, String sortField, String sortOrder) { + if(sortField == null || sortField.isEmpty()) { + sortField = "id"; + } + + Collections.sort(queryUnits, new QueryUnitHistoryComparator(sortField, "asc".equals(sortOrder))); + } + public static void sortTaskRunner(List<TaskRunner> taskRunners) { Collections.sort(taskRunners, new Comparator<TaskRunner>() { @Override @@ -147,6 +167,43 @@ public class JSPUtil { return subQueryList; } + public static List<SubQueryHistory> sortSubQueryHistory(Collection<SubQueryHistory> subQueries) { + List<SubQueryHistory> subQueryList = new ArrayList<SubQueryHistory>(subQueries); + Collections.sort(subQueryList, new Comparator<SubQueryHistory>() { + @Override + public int compare(SubQueryHistory subQuery1, SubQueryHistory subQuery2) { + long q1StartTime = subQuery1.getStartTime(); + long q2StartTime = subQuery2.getStartTime(); + + q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime); + q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime); + + int result = compareLong(q1StartTime, q2StartTime); + if (result == 0) { + return subQuery1.getExecutionBlockId().compareTo(subQuery2.getExecutionBlockId()); + } else { + return result; + } + } + }); + + return subQueryList; + } + + public static String getMasterActiveLabel(MasterContext context) { + HAService haService = context.getHAService(); + String activeLabel = ""; + if (haService != null) { + if (haService.isActiveStatus()) { + activeLabel = "<font color='#1e90ff'>(active)</font>"; + } else { + activeLabel = "<font color='#1e90ff'>(backup)</font>"; + } + } + + return activeLabel; + } + static class QueryUnitComparator implements Comparator<QueryUnit> { private String sortField; private boolean asc; @@ -194,6 +251,53 @@ public class JSPUtil { } } + static class QueryUnitHistoryComparator implements Comparator<QueryUnitHistory> { + private String sortField; + private boolean asc; + public QueryUnitHistoryComparator(String sortField, boolean asc) { + this.sortField = sortField; + this.asc = asc; + } + + @Override + public int compare(QueryUnitHistory queryUnit, QueryUnitHistory queryUnit2) { + if(asc) { + if("id".equals(sortField)) { + return queryUnit.getId().compareTo(queryUnit2.getId()); + } else if("host".equals(sortField)) { + String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort(); + String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort(); + return host1.compareTo(host2); + } else if("runTime".equals(sortField)) { + return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime()); + } else if("startTime".equals(sortField)) { + return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime()); + } else { + return queryUnit.getId().compareTo(queryUnit2.getId()); + } + } else { + if("id".equals(sortField)) { + return queryUnit2.getId().compareTo(queryUnit.getId()); + } else if("host".equals(sortField)) { + String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort(); + String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort(); + return host2.compareTo(host1); + } else if("runTime".equals(sortField)) { + if(queryUnit2.getLaunchTime() == 0) { + return -1; + } else if(queryUnit.getLaunchTime() == 0) { + return 1; + } + return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime()); + } else if("startTime".equals(sortField)) { + return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime()); + } else { + return queryUnit2.getId().compareTo(queryUnit.getId()); + } + } + } + } + static int compareLong(long a, long b) { if(a > b) { return 1; @@ -246,4 +350,60 @@ public class JSPUtil { return result; } + + public static String getPageNavigation(int currentPage, int totalPage, String url) { + StringBuilder sb = new StringBuilder(); + + int pageIndex = (currentPage - 1) / 10; + int totalPageIndex = (totalPage - 1) / 10; + + String prefix = ""; + + if (pageIndex > 0) { + int prevPage = pageIndex * 10; + sb.append(prefix).append("<a href='").append(url) + .append("&page=").append(prevPage).append("'>") + .append("<</a>"); + prefix = " "; + } + + for (int i = 1; i <= 10; i++) { + int printPage = pageIndex * 10 + i; + if (printPage == currentPage) { + sb.append(prefix).append(printPage); + } else { + sb.append(prefix).append("<a href='").append(url) + .append("&page=").append(printPage).append("'>") + .append("[").append(printPage).append("]</a>"); + } + prefix = " "; + if (printPage >= totalPage) { + break; + } + } + + if(totalPageIndex > pageIndex) { + int nextPage = (pageIndex + 1) * 10 + 1; + sb.append(prefix).append("<a href='").append(url) + .append("&page=").append(nextPage).append("'>") + .append("></a>"); + } + return sb.toString(); + } + + public static <T extends Object> List<T> getPageNavigationList(List<T> originList, int page, int pageSize) { + if (originList == null) { + return new ArrayList<T>(); + } + int start = (page - 1) * pageSize; + int end = start + pageSize; + if (end > originList.size()) { + end = originList.size(); + } + if (!originList.isEmpty()) { + return originList.subList(start, end); + } + + return originList; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/History.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/History.java b/tajo-core/src/main/java/org/apache/tajo/util/history/History.java new file mode 100644 index 0000000..0312d4c --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/History.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.history; + +public interface History { + public static enum HistoryType { + TASK, QUERY, QUERY_SUMMARY + } + + public HistoryType getHistoryType(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java new file mode 100644 index 0000000..868dfcd --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.history; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryCleaner extends Thread { + private static final Log LOG = LogFactory.getLog(HistoryCleaner.class); + + private int historyExpireDays; + private AtomicBoolean stopped = new AtomicBoolean(false); + private Path historyParentPath; + private Path taskHistoryParentPath; + private TajoConf tajoConf; + private boolean isMaster; + + public HistoryCleaner(TajoConf tajoConf, boolean isMaster) throws IOException { + super(HistoryCleaner.class.getName()); + + this.tajoConf = tajoConf; + this.isMaster = isMaster; + historyExpireDays = tajoConf.getIntVar(ConfVars.HISTORY_EXPIRY_TIME_DAY); + historyParentPath = tajoConf.getQueryHistoryDir(tajoConf); + taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf); + } + + public void doStop() { + stopped.set(true); + this.interrupt(); + } + + @Override + public void run() { + LOG.info("History cleaner started: expiry day=" + historyExpireDays); + SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); + + while (!stopped.get()) { + try { + Thread.sleep(60 * 60 * 12 * 1000); // 12 hours + } catch (InterruptedException e) { + } + + if (stopped.get()) { + break; + } + + try { + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.DAY_OF_MONTH, -historyExpireDays); + + long cleanTargetTime = cal.getTime().getTime(); + + // Clean query history directory + if (isMaster) { + FileSystem fs = historyParentPath.getFileSystem(tajoConf); + if (fs.exists(historyParentPath)) { + FileStatus[] files = fs.listStatus(historyParentPath); + if (files != null) { + for (FileStatus eachFile : files) { + String pathName = eachFile.getPath().getName(); + long pathTime; + try { + pathTime = df.parse(pathName).getTime(); + } catch (ParseException e) { + LOG.warn(eachFile.getPath() + " is not History directory format."); + continue; + } + + if (pathTime < cleanTargetTime) { + LOG.info("Cleaning query history dir: " + eachFile.getPath()); + fs.delete(eachFile.getPath(), true); + } + } + } + } + } + + if (!isMaster) { + // Clean task history directory + FileSystem fs = taskHistoryParentPath.getFileSystem(tajoConf); + if (fs.exists(taskHistoryParentPath)) { + FileStatus[] files = fs.listStatus(taskHistoryParentPath); + if (files != null) { + for (FileStatus eachFile : files) { + String pathName = eachFile.getPath().getName(); + long pathTime; + try { + pathTime = df.parse(pathName).getTime(); + } catch (ParseException e) { + LOG.warn(eachFile.getPath() + " is not History directory format."); + continue; + } + + if (pathTime < cleanTargetTime) { + LOG.info("Cleaning task history dir: " + eachFile.getPath()); + fs.delete(eachFile.getPath(), true); + } + } + } + } + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + LOG.info("History cleaner stopped"); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java new file mode 100644 index 0000000..21bc725 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.history; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; +import org.apache.tajo.master.querymaster.QueryInfo; +import org.apache.tajo.worker.TaskHistory; + +import java.io.EOFException; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; + +public class HistoryReader { + private static final Log LOG = LogFactory.getLog(HistoryReader.class); + + public static final int DEFAULT_PAGE_SIZE = 100; + public static final int DEFAULT_TASK_PAGE_SIZE = 2000; + private String processName; + private TajoConf tajoConf; + private Path historyParentPath; + private Path taskHistoryParentPath; + + public HistoryReader(String processName, TajoConf tajoConf) throws IOException { + this.processName = processName.replaceAll(":", "_").toLowerCase(); + this.tajoConf = tajoConf; + + historyParentPath = tajoConf.getQueryHistoryDir(tajoConf); + taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf); + } + + public List<QueryInfo> getQueries(String keyword) throws IOException { + List<QueryInfo> queryInfos = new ArrayList<QueryInfo>(); + + FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf); + if (!fs.exists(historyParentPath)) { + return queryInfos; + } + FileStatus[] files = fs.listStatus(historyParentPath); + if (files == null || files.length == 0) { + return queryInfos; + } + + for (FileStatus eachDateFile: files) { + if (eachDateFile.isFile()) { + continue; + } + FileStatus[] dateFiles = fs.listStatus(new Path(eachDateFile.getPath(), HistoryWriter.QUERY_LIST)); + if (dateFiles == null || dateFiles.length == 0) { + continue; + } + + for (FileStatus eachFile: dateFiles) { + if (eachFile.isDirectory()) { + continue; + } + + Path path = eachFile.getPath(); + if (!path.getName().endsWith(HistoryWriter.HISTORY_FILE_POSTFIX)) { + continue; + } + + FSDataInputStream in = null; + try { + in = fs.open(path); + + byte[] buf = new byte[100 * 1024]; + while (true) { + int length = in.readInt(); + if (length > buf.length) { + buf = new byte[length]; + } + in.readFully(buf, 0, length); + String queryInfoJson = new String(buf, 0, length); + QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson); + if (keyword != null) { + if (queryInfo.getSql().indexOf(keyword) >= 0) { + queryInfos.add(queryInfo); + } + } else { + queryInfos.add(queryInfo); + } + } + } catch (EOFException e) { + } catch (Exception e) { + LOG.error("Reading error:" + path + ", " +e.getMessage(), e); + } finally { + in.close(); + } + } + } + + Collections.sort(queryInfos, new Comparator<QueryInfo>() { + @Override + public int compare(QueryInfo query1, QueryInfo query2) { + return query2.getQueryIdStr().toString().compareTo(query1.getQueryIdStr().toString()); + } + }); + + return queryInfos; + } + + private Path getQueryHistoryFilePath(String queryId, long startTime) throws IOException { + if (startTime == 0) { + String[] tokens = queryId.split("_"); + if (tokens.length == 3) { + startTime = Long.parseLong(tokens[1]); + } else { + startTime = System.currentTimeMillis(); + } + } + Path queryHistoryPath = HistoryWriter.getQueryHistoryFilePath(historyParentPath, queryId, startTime); + FileSystem fs = HistoryWriter.getNonCrcFileSystem(queryHistoryPath, tajoConf); + + if (!fs.exists(queryHistoryPath)) { + LOG.info("No query history file: " + queryHistoryPath); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(startTime); + cal.add(Calendar.DAY_OF_MONTH, -1); + queryHistoryPath = HistoryWriter.getQueryHistoryFilePath(historyParentPath, queryId, startTime); + if (!fs.exists(queryHistoryPath)) { + LOG.info("No query history file: " + queryHistoryPath); + cal.setTimeInMillis(startTime); + cal.add(Calendar.DAY_OF_MONTH, 1); + queryHistoryPath = HistoryWriter.getQueryHistoryFilePath(historyParentPath, queryId, startTime); + } + if (!fs.exists(queryHistoryPath)) { + LOG.info("No query history file: " + queryHistoryPath); + return null; + } + } + return queryHistoryPath; + } + + public QueryHistory getQueryHistory(String queryId) throws IOException { + return getQueryHistory(queryId, 0); + } + + public QueryHistory getQueryHistory(String queryId, long startTime) throws IOException { + Path queryHistoryFile = getQueryHistoryFilePath(queryId, startTime); + if (queryHistoryFile == null) { + return null; + } + FileSystem fs = HistoryWriter.getNonCrcFileSystem(queryHistoryFile, tajoConf); + + FileStatus fileStatus = fs.getFileStatus(queryHistoryFile); + if (fileStatus.getLen() > 10 * 1024 * 1024) { + throw new IOException("QueryHistory file is too big: " + + queryHistoryFile + ", " + fileStatus.getLen() + " bytes"); + } + FSDataInputStream in = null; + try { + in = fs.open(queryHistoryFile); + byte[] buf = new byte[(int)fileStatus.getLen()]; + + in.readFully(buf, 0, buf.length); + + return QueryHistory.fromJson(new String(buf)); + } finally { + if (in != null) { + in.close(); + } + } + } + + public List<QueryUnitHistory> getQueryUnitHistory(String queryId, String ebId) throws IOException { + Path queryHistoryFile = getQueryHistoryFilePath(queryId, 0); + if (queryHistoryFile == null) { + return new ArrayList<QueryUnitHistory>(); + } + Path detailFile = new Path(queryHistoryFile.getParent(), ebId + HistoryWriter.HISTORY_FILE_POSTFIX); + FileSystem fs = HistoryWriter.getNonCrcFileSystem(detailFile, tajoConf); + + if (!fs.exists(detailFile)) { + return new ArrayList<QueryUnitHistory>(); + } + + FileStatus fileStatus = fs.getFileStatus(detailFile); + if (fileStatus.getLen() > 100 * 1024 * 1024) { // 100MB + throw new IOException("QueryUnitHistory file is too big: " + + detailFile + ", " + fileStatus.getLen() + " bytes"); + } + + FSDataInputStream in = null; + try { + in = fs.open(detailFile); + byte[] buf = new byte[(int)fileStatus.getLen()]; + + in.readFully(buf, 0, buf.length); + + return SubQueryHistory.fromJsonQueryUnits(new String(buf)); + } finally { + if (in != null) { + in.close(); + } + } + } + + public TaskHistory getTaskHistory(String queryUnitAttemptId, long startTime) throws IOException { + FileSystem fs = HistoryWriter.getNonCrcFileSystem(taskHistoryParentPath, tajoConf); + + SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); + + Calendar cal = Calendar.getInstance(); + cal.setTime(new Date(startTime)); + + //current, current-1, current+1 hour + String[] targetHistoryFileDates = new String[3]; + targetHistoryFileDates[0] = df.format(cal.getTime()); + + cal.add(Calendar.HOUR_OF_DAY, -1); + targetHistoryFileDates[1] = df.format(cal.getTime()); + + cal.setTime(new Date(startTime)); + cal.add(Calendar.HOUR_OF_DAY, 1); + targetHistoryFileDates[2] = df.format(cal.getTime()); + + for (String historyFileDate : targetHistoryFileDates) { + Path fileParent = new Path(taskHistoryParentPath, historyFileDate.substring(0, 8) + "/tasks/" + processName); + String hour = historyFileDate.substring(8, 10); + + if (!fs.exists(fileParent)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Task history parent not exists:" + fileParent); + } + continue; + } + + FileStatus[] files = fs.listStatus(fileParent); + if (files == null || files.length == 0) { + return null; + } + + String filePrefix = processName + "_" + hour + "_"; + + for (FileStatus eachFile : files) { + if (eachFile.getPath().getName().indexOf(filePrefix) != 0) { + continue; + } + + FSDataInputStream in = null; + TaskHistoryProto.Builder builder = TaskHistoryProto.newBuilder(); + try { + FileStatus status = fs.getFileStatus(eachFile.getPath()); + LOG.info("Finding TaskHistory from " + status.getLen() + "," + eachFile.getPath()); + + in = fs.open(eachFile.getPath()); + while (true) { + int len = in.readInt(); + byte[] buf = new byte[len]; + in.readFully(buf, 0, len); + + builder.clear(); + TaskHistoryProto taskHistoryProto = builder.mergeFrom(buf).build(); + QueryUnitAttemptId attemptId = new QueryUnitAttemptId(taskHistoryProto.getQueryUnitAttemptId()); + if (attemptId.toString().equals(queryUnitAttemptId)) { + return new TaskHistory(taskHistoryProto); + } + } + } catch (EOFException e) { + } finally { + if (in != null) { + in.close(); + } + } + } + } + return null; + } + + public QueryInfo getQueryInfo(String queryId) throws IOException { + List<QueryInfo> queries = getQueries(null); + + if (queries != null) { + for (QueryInfo queryInfo: queries) { + if (queryId.equals(queryInfo.getQueryId().toString())) { + return queryInfo; + } + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e01b00a7/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java new file mode 100644 index 0000000..63a143b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -0,0 +1,450 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.history; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.service.AbstractService; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.querymaster.QueryInfo; +import org.apache.tajo.worker.TaskHistory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * History directory structure + * tajo.query-history.path: hdfs + * tajo.task-history.path: local or hdfs + * + * <tajo.history.query.dir>/<yyyyMMdd>/query-list/query-list-<HHmmss>.hist (TajoMaster's query list, hourly rolling) + * /query-detail/<QUERY_ID>/query.hist (QueryMaster's query detail) + * /<EB_ID>.hist (QueryMaster's subquery detail) + * <tajo.history.task.dir>/<yyyyMMdd>/tasks/<WORKER_HOST>_<WORKER_PORT>/<WORKER_HOST>_<WORKER_PORT>_<HH>_<seq>.hist + * History files are kept for "tajo.history.expiry-time-day" (default value is 7 days) + */ +public class HistoryWriter extends AbstractService { + private static final Log LOG = LogFactory.getLog(HistoryWriter.class); + public static final String QUERY_LIST = "query-list"; + public static final String QUERY_DETAIL = "query-detail"; + public static final String HISTORY_FILE_POSTFIX = ".hist"; + + private final LinkedBlockingQueue<History> historyQueue = new LinkedBlockingQueue<History>(); + // key: yyyyMMddHH + private Map<String, WriterHolder> taskWriters = new HashMap<String, WriterHolder>(); + + // For TajoMaster's query list + private WriterHolder querySummaryWriter = null; + + private WriterThread writerThread; + private AtomicBoolean stopped = new AtomicBoolean(false); + private Path historyParentPath; + private Path taskHistoryParentPath; + private String processName; + private TajoConf tajoConf; + private HistoryCleaner historyCleaner; + private boolean isMaster; + + public HistoryWriter(String processName, boolean isMaster) { + super(HistoryWriter.class.getName() + ":" + processName); + this.processName = processName.replaceAll(":", "_").toLowerCase(); + this.isMaster = isMaster; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + tajoConf = (TajoConf)conf; + historyParentPath = tajoConf.getQueryHistoryDir(tajoConf); + taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf); + writerThread = new WriterThread(); + historyCleaner = new HistoryCleaner(tajoConf, isMaster); + super.serviceInit(conf); + } + + @Override + public void serviceStop() throws Exception { + for (WriterHolder eachWriter : taskWriters.values()) { + if (eachWriter.out != null) { + try { + eachWriter.out.close(); + } catch (Exception err) { + LOG.error(err.getMessage(), err); + } + } + } + taskWriters.clear(); + stopped.set(true); + writerThread.interrupt(); + + if (querySummaryWriter != null && querySummaryWriter.out != null) { + try { + querySummaryWriter.out.close(); + } catch (Exception err) { + LOG.error(err.getMessage(), err); + } + } + + if (historyCleaner != null) { + historyCleaner.doStop(); + } + super.serviceStop(); + } + + @Override + public void serviceStart() throws Exception { + writerThread.start(); + historyCleaner.start(); + } + + public void appendHistory(History history) { + synchronized (historyQueue) { + historyQueue.add(history); + historyQueue.notifyAll(); + } + } + + public static FileSystem getNonCrcFileSystem(Path path, Configuration conf) throws IOException { + // https://issues.apache.org/jira/browse/HADOOP-7844 + // If FileSystem is a local and CheckSumFileSystem, flushing doesn't touch file length. + // So HistoryReader can't read until closing the file. + FileSystem fs = path.getFileSystem(conf); + if (path.toUri().getScheme().equals("file")) { + fs.setWriteChecksum(false); + } + + return fs; + } + + public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId, long startTime) { + SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); + + Path datePath = new Path(historyParentPath, df.format(startTime) + "/" + QUERY_DETAIL); + return new Path(datePath, queryId + "/query" + HISTORY_FILE_POSTFIX); + } + + public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId) { + SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); + + Path datePath = null; + try { + String[] tokens = queryId.split("_"); + //q_1412483083972_0005 = q_<timestamp>_<seq> + if (tokens.length == 3) { + datePath = new Path(historyParentPath, df.format(tokens[1]) + "/" + QUERY_DETAIL); + } else { + datePath = new Path(historyParentPath, df.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL); + } + } catch (Exception e) { + datePath = new Path(historyParentPath, df.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL); + } + return new Path(datePath, queryId + "/query" + HISTORY_FILE_POSTFIX); + } + + class WriterThread extends Thread { + public void run() { + LOG.info("HistoryWriter_"+ processName + " started."); + SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); + while (!stopped.get()) { + List<History> histories = new ArrayList<History>(); + synchronized (historyQueue) { + historyQueue.drainTo(histories); + if (histories.isEmpty()) { + try { + historyQueue.wait(60 * 1000); + } catch (InterruptedException e) { + if (stopped.get()) { + break; + } + } + } + } + if (stopped.get()) { + break; + } + try { + if (!histories.isEmpty()) { + writeHistory(histories); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + + //clean up history file + + // closing previous writer + synchronized (taskWriters) { + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.HOUR_OF_DAY, -2); + String closeTargetTime = df.format(cal.getTime()); + List<String> closingTargets = new ArrayList<String>(); + synchronized (taskWriters) { + for (String eachWriterTime : taskWriters.keySet()) { + if (eachWriterTime.compareTo(closeTargetTime) <= 0) { + closingTargets.add(eachWriterTime); + } + } + } + + for (String eachWriterTime : closingTargets) { + WriterHolder writerHolder = null; + synchronized (taskWriters) { + writerHolder = taskWriters.remove(eachWriterTime); + } + if (writerHolder != null) { + LOG.info("Closing task history file: " + writerHolder.path); + if (writerHolder.out != null) { + try { + writerHolder.out.close(); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } + } + } + } + LOG.info("HistoryWriter_"+ processName + " stopped."); + } + + public void writeHistory(List<History> histories) { + if (histories.isEmpty()) { + return; + } + for (History eachHistory : histories) { + switch(eachHistory.getHistoryType()) { + case TASK: + try { + writeTaskHistory((TaskHistory) eachHistory); + } catch (Exception e) { + LOG.error("Error while saving task history: " + + ((TaskHistory) eachHistory).getQueryUnitAttemptId() + ":" + e.getMessage(), e); + } + break; + case QUERY: + try { + writeQueryHistory((QueryHistory) eachHistory); + } catch (Exception e) { + LOG.error("Error while saving query history: " + + ((QueryHistory) eachHistory).getQueryId() + ":" + e.getMessage(), e); + } + break; + case QUERY_SUMMARY: + try { + writeQuerySummary((QueryInfo) eachHistory); + } catch (Exception e) { + LOG.error("Error while saving query summary: " + + ((QueryInfo) eachHistory).getQueryId() + ":" + e.getMessage(), e); + } + break; + default: + LOG.warn("Wrong history type: " + eachHistory.getHistoryType()); + } + } + } + + private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Exception { + // QueryMaster's query detail history (json format) + // <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/query.hist + + // QueryMaster's subquery detail history(proto binary format) + // <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/<EB_ID>.hist + + Path queryHistoryFile = getQueryHistoryFilePath(historyParentPath, queryHistory.getQueryId()); + FileSystem fs = getNonCrcFileSystem(queryHistoryFile, tajoConf); + + if (!fs.exists(queryHistoryFile.getParent())) { + if (!fs.mkdirs(queryHistoryFile.getParent())) { + LOG.error("Can't make QueryHistory dir: " + queryHistoryFile.getParent()); + return; + } + } + + FSDataOutputStream out = null; + try { + LOG.info("Saving query summary: " + queryHistoryFile); + out = fs.create(queryHistoryFile); + out.write(queryHistory.toJson().getBytes()); + } finally { + if (out != null) { + try { + out.close(); + } catch (Exception err) { + LOG.error(err.getMessage(), err); + } + } + } + + if (queryHistory.getSubQueryHistories() != null) { + for (SubQueryHistory subQueryHistory : queryHistory.getSubQueryHistories()) { + Path path = new Path(queryHistoryFile.getParent(), subQueryHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX); + out = null; + try { + out = fs.create(path); + out.write(subQueryHistory.toQueryUnitsJson().getBytes()); + LOG.info("Saving query unit: " + path); + } finally { + if (out != null) { + try { + out.close(); + } catch (Exception err) { + LOG.error(err.getMessage(), err); + } + } + } + } + } + } + + private synchronized void writeQuerySummary(QueryInfo queryInfo) throws Exception { + // writing to HDFS and rolling hourly + if (querySummaryWriter == null) { + querySummaryWriter = new WriterHolder(); + rollingQuerySummaryWriter(); + } else { + if (querySummaryWriter.out == null) { + rollingQuerySummaryWriter(); + } else if (System.currentTimeMillis() - querySummaryWriter.lastWritingTime >= 60 * 60 * 1000) { + if (querySummaryWriter.out != null) { + LOG.info("Close query history file: " + querySummaryWriter.path); + querySummaryWriter.out.close(); + } + rollingQuerySummaryWriter(); + } + } + byte[] jsonBytes = ("\n" + queryInfo.toJson() + "\n").getBytes(); + + querySummaryWriter.out.writeInt(jsonBytes.length); + querySummaryWriter.out.write(jsonBytes); + querySummaryWriter.out.hflush(); + } + + private synchronized void rollingQuerySummaryWriter() throws Exception { + // finding largest file sequence + SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss"); + String currentDateTime = df.format(new Date(System.currentTimeMillis())); + + Path datePath = new Path(historyParentPath, currentDateTime.substring(0, 8) + "/" + QUERY_LIST); + FileSystem fs = getNonCrcFileSystem(datePath, tajoConf); + if (!fs.exists(datePath)) { + if (!fs.mkdirs(datePath)) { + LOG.error("Can't make QueryList history dir: " + datePath.getParent()); + return; + } + } + + Path historyFile = new Path(datePath, QUERY_LIST + "-" + currentDateTime.substring(8, 14) + HISTORY_FILE_POSTFIX); + querySummaryWriter.path = historyFile; + querySummaryWriter.lastWritingTime = System.currentTimeMillis(); + LOG.info("Create query history file: " + historyFile); + querySummaryWriter.out = fs.create(historyFile); + } + + private synchronized void writeTaskHistory(TaskHistory taskHistory) throws Exception { + SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); + + String taskStartTime = df.format(new Date(taskHistory.getStartTime())); + + // taskWriters variable has three Writer(currentTime-2, currentTime-1, currentTime) + // because Task history writer is rolled by hourly + WriterHolder writerHolder = taskWriters.get(taskStartTime); + if (writerHolder == null) { + writerHolder = new WriterHolder(); + writerHolder.out = createTaskHistoryFile(taskStartTime, writerHolder); + taskWriters.put(taskStartTime, writerHolder); + } + writerHolder.lastWritingTime = System.currentTimeMillis(); + + if (writerHolder.out != null) { + byte[] taskHistoryBytes = taskHistory.getProto().toByteArray(); + writerHolder.out.writeInt(taskHistoryBytes.length); + writerHolder.out.write(taskHistoryBytes); + writerHolder.out.flush(); + } + } + + private FSDataOutputStream createTaskHistoryFile(String taskStartTime, WriterHolder writerHolder) throws IOException { + FileSystem fs = getNonCrcFileSystem(taskHistoryParentPath, tajoConf); + Path path = getQueryTaskHistoryPath(fs, taskHistoryParentPath, processName, taskStartTime); + if (!fs.exists(path)) { + if (!fs.mkdirs(path.getParent())) { + LOG.error("Can't make Query history directory: " + path); + return null; + } + } + writerHolder.path = path; + return fs.create(path, false); + } + } + + public static Path getQueryTaskHistoryPath(FileSystem fs, Path parent, + String processName, String taskStartTime) throws IOException { + // <tajo.task-history.path>/<yyyyMMdd>/tasks/<WORKER_HOST>_<WORKER_PORT>/<WORKER_HOST>_<WORKER_PORT>_<HH>_<seq>.hist + + // finding largest sequence path + Path fileParent = new Path(parent, taskStartTime.substring(0, 8) + "/tasks/" + processName); + + String hour = taskStartTime.substring(8, 10); + int maxSeq = -1; + + if (!fs.exists(fileParent)) { + maxSeq++; + return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX); + } + + if (!fs.isDirectory(fileParent)) { + throw new IOException("Task history path is not directory: " + fileParent); + } + FileStatus[] files = fs.listStatus(fileParent); + if (files != null) { + for (FileStatus eachFile: files) { + String[] nameTokens = eachFile.getPath().getName().split("_"); + if (nameTokens.length != 4) { + continue; + } + + if (nameTokens[2].equals(hour)) { + int prefixIndex = nameTokens[3].indexOf("."); + if (prefixIndex > 0) { + try { + int fileSeq = Integer.parseInt(nameTokens[3].substring(0, prefixIndex)); + if (fileSeq > maxSeq) { + maxSeq = fileSeq; + } + } catch (NumberFormatException e) { + } + } + } + } + } + + maxSeq++; + return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX); + } + + class WriterHolder { + long lastWritingTime; + Path path; + FSDataOutputStream out; + } +}
