Repository: incubator-impala Updated Branches: refs/heads/master 3dff390e4 -> b1af24556
IMPALA-3736: Move Impala HTTP handlers to a separate class HTTP handler callbacks take a lot of header room, partly because they usually have their JSON output documented literally. This patch moves ImpalaServer's HTTP handlers out of ImpalaServer itself, into a friend class. Change-Id: I8453b3367653914163ca6acae48e7605f73cc675 Reviewed-on: http://gerrit.cloudera.org:8080/3370 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Henry Robinson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b1af2455 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b1af2455 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b1af2455 Branch: refs/heads/master Commit: b1af24556347b72c3160f66dd9434d537b7daa9b Parents: 3dff390 Author: Henry Robinson <[email protected]> Authored: Fri Jun 10 14:42:18 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Tue Jun 14 11:32:53 2016 -0700 ---------------------------------------------------------------------- be/src/service/CMakeLists.txt | 2 +- be/src/service/impala-http-handler.cc | 695 +++++++++++++++++++++++++ be/src/service/impala-http-handler.h | 140 +++++ be/src/service/impala-server-callbacks.cc | 690 ------------------------ be/src/service/impala-server.cc | 4 +- be/src/service/impala-server.h | 108 +--- 6 files changed, 843 insertions(+), 796 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1af2455/be/src/service/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index 7dc1dd1..2167008 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -25,7 +25,7 @@ add_library(Service fragment-mgr.cc hs2-util.cc impala-server.cc - impala-server-callbacks.cc + impala-http-handler.cc impala-hs2-server.cc impala-beeswax-server.cc query-exec-state.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1af2455/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc new file mode 100644 index 0000000..5b97890 --- /dev/null +++ b/be/src/service/impala-http-handler.cc @@ -0,0 +1,695 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "service/impala-http-handler.h" + +#include <sstream> +#include <boost/thread/mutex.hpp> +#include <gutil/strings/substitute.h> + +#include "catalog/catalog-util.h" +#include "service/impala-server.h" +#include "service/query-exec-state.h" +#include "gen-cpp/beeswax_types.h" +#include "thrift/protocol/TDebugProtocol.h" +#include "util/redactor.h" +#include "util/summary-util.h" +#include "util/time.h" +#include "util/url-coding.h" +#include "util/webserver.h" + +#include "common/names.h" + +using boost::adopt_lock_t; +using namespace apache::thrift; +using namespace beeswax; +using namespace impala; +using namespace rapidjson; +using namespace strings; + +DECLARE_int32(query_log_size); + +namespace { + +// Helper method to turn a class + a method to invoke into a UrlCallback +template<typename T, class F> +Webserver::UrlCallback MakeCallback(T* caller, const F& fnc) { + return [caller, fnc](const auto& args, auto* doc) { + (caller->*fnc)(args, doc); + }; +} + +// We expect the query id to be passed as one parameter, 'query_id'. +// Returns true if the query id was present and valid; false otherwise. +static Status ParseQueryId(const Webserver::ArgumentMap& args, TUniqueId* id) { + Webserver::ArgumentMap::const_iterator it = args.find("query_id"); + if (it == args.end()) { + return Status("No 'query_id' argument found"); + } else { + if (ParseId(it->second, id)) return Status::OK(); + return Status(Substitute("Could not parse 'query_id' argument: $0", it->second)); + } +} + +} + +void ImpalaHttpHandler::RegisterHandlers(Webserver* webserver) { + DCHECK(webserver != NULL); + + webserver->RegisterUrlCallback("/hadoop-varz", "hadoop-varz.tmpl", + MakeCallback(this, &ImpalaHttpHandler::HadoopVarzHandler)); + + webserver->RegisterUrlCallback("/queries", "queries.tmpl", + MakeCallback(this, &ImpalaHttpHandler::QueryStateHandler)); + + webserver->RegisterUrlCallback("/sessions", "sessions.tmpl", + MakeCallback(this, &ImpalaHttpHandler::SessionsHandler)); + + webserver->RegisterUrlCallback("/catalog", "catalog.tmpl", + MakeCallback(this, &ImpalaHttpHandler::CatalogHandler)); + + webserver->RegisterUrlCallback("/catalog_object", "catalog_object.tmpl", + MakeCallback(this, &ImpalaHttpHandler::CatalogObjectsHandler), false); + + webserver->RegisterUrlCallback("/query_profile", "query_profile.tmpl", + MakeCallback(this, &ImpalaHttpHandler::QueryProfileHandler), false); + + webserver->RegisterUrlCallback("/cancel_query", "common-pre.tmpl", + MakeCallback(this, &ImpalaHttpHandler::CancelQueryHandler), false); + + webserver->RegisterUrlCallback("/query_profile_encoded", "raw_text.tmpl", + MakeCallback(this, &ImpalaHttpHandler::QueryProfileEncodedHandler), false); + + webserver->RegisterUrlCallback("/inflight_query_ids", "raw_text.tmpl", + MakeCallback(this, &ImpalaHttpHandler::InflightQueryIdsHandler), false); + + webserver->RegisterUrlCallback("/query_summary", "query_summary.tmpl", + [this](const auto& args, auto* doc) { + this->QuerySummaryHandler(false, true, args, doc); }, false); + + webserver->RegisterUrlCallback("/query_plan", "query_plan.tmpl", + [this](const auto& args, auto* doc) { + this->QuerySummaryHandler(true, true, args, doc); }, false); + + webserver->RegisterUrlCallback("/query_plan_text", "query_plan_text.tmpl", + [this](const auto& args, auto* doc) { + this->QuerySummaryHandler(false, false, args, doc); }, false); + + webserver->RegisterUrlCallback("/query_stmt", "query_stmt.tmpl", + [this](const auto& args, auto* doc) { + this->QuerySummaryHandler(false, false, args, doc); }, false); +} + +void ImpalaHttpHandler::HadoopVarzHandler(const Webserver::ArgumentMap& args, + Document* document) { + TGetAllHadoopConfigsResponse response; + Status status = server_->exec_env_->frontend()->GetAllHadoopConfigs(&response); + if (!status.ok()) return; + + Value configs(kArrayType); + typedef map<string, string> ConfigMap; + for (const ConfigMap::value_type& config: response.configs) { + Value key(config.first.c_str(), document->GetAllocator()); + Value value(config.second.c_str(), document->GetAllocator()); + Value config_json(kObjectType); + config_json.AddMember("key", key, document->GetAllocator()); + config_json.AddMember("value", value, document->GetAllocator()); + configs.PushBack(config_json, document->GetAllocator()); + } + document->AddMember("configs", configs, document->GetAllocator()); +} + +void ImpalaHttpHandler::CancelQueryHandler(const Webserver::ArgumentMap& args, + Document* document) { + TUniqueId unique_id; + Status status = ParseQueryId(args, &unique_id); + if (!status.ok()) { + Value error(status.GetDetail().c_str(), document->GetAllocator()); + document->AddMember("error", error, document->GetAllocator()); + return; + } + Status cause("Cancelled from Impala's debug web interface"); + status = server_->UnregisterQuery(unique_id, true, &cause); + if (!status.ok()) { + Value error(status.GetDetail().c_str(), document->GetAllocator()); + document->AddMember("error", error, document->GetAllocator()); + return; + } + Value message("Query cancellation successful", document->GetAllocator()); + document->AddMember("contents", message, document->GetAllocator()); +} + +void ImpalaHttpHandler::QueryProfileHandler(const Webserver::ArgumentMap& args, + Document* document) { + TUniqueId unique_id; + Status parse_status = ParseQueryId(args, &unique_id); + if (!parse_status.ok()) { + Value error(parse_status.GetDetail().c_str(), document->GetAllocator()); + document->AddMember("error", error, document->GetAllocator()); + return; + } + + stringstream ss; + Status status = server_->GetRuntimeProfileStr(unique_id, false, &ss); + if (!status.ok()) { + Value error(status.GetDetail().c_str(), document->GetAllocator()); + document->AddMember("error", error, document->GetAllocator()); + return; + } + + Value profile(ss.str().c_str(), document->GetAllocator()); + document->AddMember("profile", profile, document->GetAllocator()); + document->AddMember("query_id", args.find("query_id")->second.c_str(), + document->GetAllocator()); +} + +void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap& args, + Document* document) { + TUniqueId unique_id; + stringstream ss; + Status status = ParseQueryId(args, &unique_id); + if (!status.ok()) { + ss << status.GetDetail(); + } else { + Status status = server_->GetRuntimeProfileStr(unique_id, true, &ss); + if (!status.ok()) { + ss.str(Substitute("Could not obtain runtime profile: $0", status.GetDetail())); + } + } + + document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator()); + Value profile(ss.str().c_str(), document->GetAllocator()); + document->AddMember("contents", profile, document->GetAllocator()); +} + +void ImpalaHttpHandler::InflightQueryIdsHandler(const Webserver::ArgumentMap& args, + Document* document) { + lock_guard<mutex> l(server_->query_exec_state_map_lock_); + stringstream ss; + for (const ImpalaServer::QueryExecStateMap::value_type& exec_state: + server_->query_exec_state_map_) { + ss << exec_state.second->query_id() << "\n"; + } + document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator()); + Value query_ids(ss.str().c_str(), document->GetAllocator()); + document->AddMember("contents", query_ids, document->GetAllocator()); +} + +void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord& record, + Value* value, Document* document) { + Value user(record.effective_user.c_str(), document->GetAllocator()); + value->AddMember("effective_user", user, document->GetAllocator()); + + Value default_db(record.default_db.c_str(), document->GetAllocator()); + value->AddMember("default_db", default_db, document->GetAllocator()); + + // Redact the query string + Value stmt(RedactCopy(record.stmt).c_str(), document->GetAllocator()); + value->AddMember("stmt", stmt, document->GetAllocator()); + + Value stmt_type(_TStmtType_VALUES_TO_NAMES.find(record.stmt_type)->second, + document->GetAllocator()); + value->AddMember("stmt_type", stmt_type, document->GetAllocator()); + + Value start_time(record.start_time.DebugString().c_str(), document->GetAllocator()); + value->AddMember("start_time", start_time, document->GetAllocator()); + + Value end_time(record.end_time.DebugString().c_str(), document->GetAllocator()); + value->AddMember("end_time", end_time, document->GetAllocator()); + + const TimestampValue& end_timestamp = + record.end_time.HasDate() ? record.end_time : TimestampValue::LocalTime(); + double ut_end_time, ut_start_time; + double duration = 0.0; + if (LIKELY(end_timestamp.ToSubsecondUnixTime(&ut_end_time)) + && LIKELY(record.start_time.ToSubsecondUnixTime(&ut_start_time))) { + duration = ut_end_time - ut_start_time; + } + const string& printed_duration = PrettyPrinter::Print(duration, TUnit::TIME_S); + Value val_duration(printed_duration.c_str(), document->GetAllocator()); + value->AddMember("duration", val_duration, document->GetAllocator()); + + string progress = "N/A"; + if (record.has_coord) { + stringstream ss; + ss << record.num_complete_fragments << " / " << record.total_fragments + << " (" << setw(4); + if (record.total_fragments == 0) { + ss << "0%)"; + } else { + ss << (100.0 * record.num_complete_fragments / (1.f * record.total_fragments)) + << "%)"; + } + progress = ss.str(); + } + Value progress_json(progress.c_str(), document->GetAllocator()); + value->AddMember("progress", progress_json, document->GetAllocator()); + + Value state(_QueryState_VALUES_TO_NAMES.find(record.query_state)->second, + document->GetAllocator()); + value->AddMember("state", state, document->GetAllocator()); + + value->AddMember("rows_fetched", record.num_rows_fetched, document->GetAllocator()); + + Value query_id(PrintId(record.id).c_str(), document->GetAllocator()); + value->AddMember("query_id", query_id, document->GetAllocator()); + + if (record.event_sequence.labels.size() > 0) { + Value last_event(record.event_sequence.labels.back().c_str(), + document->GetAllocator()); + value->AddMember("last_event", last_event, document->GetAllocator()); + } + + // Waiting to be closed. + bool waiting = record.query_state == beeswax::QueryState::EXCEPTION || + record.all_rows_returned; + value->AddMember("waiting", waiting, document->GetAllocator()); + value->AddMember("executing", !waiting, document->GetAllocator()); + + int64_t waiting_time = impala::UnixMillis() - record.last_active_time; + string waiting_time_str = ""; + if (waiting_time > 0) { + waiting_time_str = PrettyPrinter::Print(waiting_time, TUnit::TIME_MS); + } + Value val_waiting_time(waiting_time_str.c_str(), document->GetAllocator()); + value->AddMember("waiting_time", val_waiting_time, document->GetAllocator()); +} + +void ImpalaHttpHandler::QueryStateHandler(const Webserver::ArgumentMap& args, + Document* document) { + set<ImpalaServer::QueryStateRecord, ImpalaServer::QueryStateRecordLessThan> + sorted_query_records; + { + lock_guard<mutex> l(server_->query_exec_state_map_lock_); + for (const ImpalaServer::QueryExecStateMap::value_type& exec_state: + server_->query_exec_state_map_) { + // TODO: Do this in the browser so that sorts on other keys are possible. + sorted_query_records.insert(ImpalaServer::QueryStateRecord(*exec_state.second)); + } + } + + Value in_flight_queries(kArrayType); + int64_t num_waiting_queries = 0; + for (const ImpalaServer::QueryStateRecord& record: sorted_query_records) { + Value record_json(kObjectType); + QueryStateToJson(record, &record_json, document); + + if (record_json["waiting"].GetBool()) ++num_waiting_queries; + + in_flight_queries.PushBack(record_json, document->GetAllocator()); + } + document->AddMember("in_flight_queries", in_flight_queries, document->GetAllocator()); + document->AddMember("num_in_flight_queries", + static_cast<uint64_t>(sorted_query_records.size()), + document->GetAllocator()); + document->AddMember("num_executing_queries", + sorted_query_records.size() - num_waiting_queries, + document->GetAllocator()); + document->AddMember("num_waiting_queries", num_waiting_queries, + document->GetAllocator()); + document->AddMember("waiting-tooltip", "These queries are no longer executing, either " + "because they encountered an error or because they have returned all of their " + "results, but they are still active so that their results can be inspected. To " + "free the resources they are using, they must be closed.", + document->GetAllocator()); + + Value completed_queries(kArrayType); + { + lock_guard<mutex> l(server_->query_log_lock_); + for (const ImpalaServer::QueryStateRecord& log_entry: server_->query_log_) { + Value record_json(kObjectType); + QueryStateToJson(log_entry, &record_json, document); + completed_queries.PushBack(record_json, document->GetAllocator()); + } + } + document->AddMember("completed_queries", completed_queries, document->GetAllocator()); + document->AddMember("completed_log_size", FLAGS_query_log_size, + document->GetAllocator()); + + Value query_locations(kArrayType); + { + lock_guard<mutex> l(server_->query_locations_lock_); + for (const ImpalaServer::QueryLocations::value_type& location: + server_->query_locations_) { + Value location_json(kObjectType); + Value location_name(lexical_cast<string>(location.first).c_str(), + document->GetAllocator()); + location_json.AddMember("location", location_name, document->GetAllocator()); + location_json.AddMember("count", static_cast<uint64_t>(location.second.size()), + document->GetAllocator()); + query_locations.PushBack(location_json, document->GetAllocator()); + } + } + document->AddMember("query_locations", query_locations, document->GetAllocator()); +} + + +void ImpalaHttpHandler::SessionsHandler(const Webserver::ArgumentMap& args, + Document* document) { + lock_guard<mutex> l(server_->session_state_map_lock_); + Value sessions(kArrayType); + for (const ImpalaServer::SessionStateMap::value_type& session: + server_->session_state_map_) { + shared_ptr<ImpalaServer::SessionState> state = session.second; + Value session_json(kObjectType); + Value type(PrintTSessionType(state->session_type).c_str(), + document->GetAllocator()); + session_json.AddMember("type", type, document->GetAllocator()); + + session_json.AddMember("inflight_queries", + static_cast<uint64_t>(state->inflight_queries.size()), + document->GetAllocator()); + session_json.AddMember("total_queries", state->total_queries, + document->GetAllocator()); + + Value user(state->connected_user.c_str(), document->GetAllocator()); + session_json.AddMember("user", user, document->GetAllocator()); + + Value delegated_user(state->do_as_user.c_str(), document->GetAllocator()); + session_json.AddMember("delegated_user", delegated_user, document->GetAllocator()); + + Value session_id(PrintId(session.first).c_str(), document->GetAllocator()); + session_json.AddMember("session_id", session_id, document->GetAllocator()); + + Value network_address(lexical_cast<string>(state->network_address).c_str(), + document->GetAllocator()); + session_json.AddMember("network_address", network_address, document->GetAllocator()); + + Value default_db(state->database.c_str(), document->GetAllocator()); + session_json.AddMember("default_database", default_db, document->GetAllocator()); + + Value start_time(state->start_time.DebugString().c_str(), document->GetAllocator()); + session_json.AddMember("start_time", start_time, document->GetAllocator()); + + Value last_accessed( + TimestampValue(session.second->last_accessed_ms / 1000).DebugString().c_str(), + document->GetAllocator()); + session_json.AddMember("last_accessed", last_accessed, document->GetAllocator()); + + session_json.AddMember("session_timeout", state->session_timeout, + document->GetAllocator()); + session_json.AddMember("expired", state->expired, document->GetAllocator()); + session_json.AddMember("closed", state->closed, document->GetAllocator()); + session_json.AddMember("ref_count", state->ref_count, document->GetAllocator()); + sessions.PushBack(session_json, document->GetAllocator()); + } + + document->AddMember("sessions", sessions, document->GetAllocator()); + document->AddMember("num_sessions", + static_cast<uint64_t>(server_->session_state_map_.size()), + document->GetAllocator()); +} + +void ImpalaHttpHandler::CatalogHandler(const Webserver::ArgumentMap& args, + Document* document) { + TGetDbsResult get_dbs_result; + Status status = server_->exec_env_->frontend()->GetDbs(NULL, NULL, &get_dbs_result); + if (!status.ok()) { + Value error(status.GetDetail().c_str(), document->GetAllocator()); + document->AddMember("error", error, document->GetAllocator()); + return; + } + + Value databases(kArrayType); + for (const TDatabase& db: get_dbs_result.dbs) { + Value database(kObjectType); + Value str(db.db_name.c_str(), document->GetAllocator()); + database.AddMember("name", str, document->GetAllocator()); + + TGetTablesResult get_table_results; + Status status = server_->exec_env_->frontend()->GetTableNames( + db.db_name, NULL, NULL, &get_table_results); + if (!status.ok()) { + Value error(status.GetDetail().c_str(), document->GetAllocator()); + database.AddMember("error", error, document->GetAllocator()); + continue; + } + + Value table_array(kArrayType); + for (const string& table: get_table_results.tables) { + Value table_obj(kObjectType); + Value fq_name(Substitute("$0.$1", db.db_name, table).c_str(), + document->GetAllocator()); + table_obj.AddMember("fqtn", fq_name, document->GetAllocator()); + Value table_name(table.c_str(), document->GetAllocator()); + table_obj.AddMember("name", table_name, document->GetAllocator()); + table_array.PushBack(table_obj, document->GetAllocator()); + } + database.AddMember("num_tables", table_array.Size(), document->GetAllocator()); + database.AddMember("tables", table_array, document->GetAllocator()); + databases.PushBack(database, document->GetAllocator()); + } + document->AddMember("databases", databases, document->GetAllocator()); +} + +void ImpalaHttpHandler::CatalogObjectsHandler(const Webserver::ArgumentMap& args, + Document* document) { + Webserver::ArgumentMap::const_iterator object_type_arg = args.find("object_type"); + Webserver::ArgumentMap::const_iterator object_name_arg = args.find("object_name"); + if (object_type_arg != args.end() && object_name_arg != args.end()) { + TCatalogObjectType::type object_type = + TCatalogObjectTypeFromName(object_type_arg->second); + + // Get the object type and name from the topic entry key + TCatalogObject request; + TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request); + + // Get the object and dump its contents. + TCatalogObject result; + Status status = server_->exec_env_->frontend()->GetCatalogObject(request, &result); + if (status.ok()) { + Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator()); + document->AddMember("thrift_string", debug_string, document->GetAllocator()); + } else { + Value error(status.GetDetail().c_str(), document->GetAllocator()); + document->AddMember("error", error, document->GetAllocator()); + } + } else { + Value error("Please specify values for the object_type and object_name parameters.", + document->GetAllocator()); + document->AddMember("error", error, document->GetAllocator()); + } +} + +namespace { + +// Helper for PlanToJson(), processes a single list of plan nodes which are the +// DFS-flattened representation of a single plan fragment. Called recursively, the +// iterator parameter is updated in place so that when a recursive call returns, the +// caller is pointing at the next of its children. +void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries, + const vector<TPlanNode>& nodes, + vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) { + Value children(kArrayType); + value->AddMember("label", (*it)->label.c_str(), document->GetAllocator()); + // Node "details" may contain exprs which should be redacted. + Value label_detail(RedactCopy((*it)->label_detail).c_str(), document->GetAllocator()); + value->AddMember("label_detail", label_detail, document->GetAllocator()); + + TPlanNodeId id = (*it)->node_id; + map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary = summaries.find(id); + if (summary != summaries.end()) { + int64_t cardinality = 0; + int64_t max_time = 0L; + int64_t total_time = 0; + for (const TExecStats& stat: summary->second.exec_stats) { + if (summary->second.is_broadcast) { + // Avoid multiple-counting for recipients of broadcasts. + cardinality = ::max(cardinality, stat.cardinality); + } else { + cardinality += stat.cardinality; + } + total_time += stat.latency_ns; + max_time = ::max(max_time, stat.latency_ns); + } + value->AddMember("output_card", cardinality, document->GetAllocator()); + value->AddMember("num_instances", + static_cast<uint64_t>(summary->second.exec_stats.size()), + document->GetAllocator()); + if (summary->second.is_broadcast) { + value->AddMember("is_broadcast", true, document->GetAllocator()); + } + + const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS); + Value max_time_str_json(max_time_str.c_str(), document->GetAllocator()); + value->AddMember("max_time", max_time_str_json, document->GetAllocator()); + value->AddMember("max_time_val", max_time, document->GetAllocator()); + + // Round to the nearest ns, to workaround a bug in pretty-printing a fraction of a + // ns. See IMPALA-1800. + const string& avg_time_str = PrettyPrinter::Print( + // A bug may occasionally cause 1-instance nodes to appear to have 0 instances. + total_time / ::max(static_cast<int>(summary->second.exec_stats.size()), 1), + TUnit::TIME_NS); + Value avg_time_str_json(avg_time_str.c_str(), document->GetAllocator()); + value->AddMember("avg_time", avg_time_str_json, document->GetAllocator()); + } + + int num_children = (*it)->num_children; + for (int i = 0; i < num_children; ++i) { + ++(*it); + Value container(kObjectType); + PlanToJsonHelper(summaries, nodes, it, document, &container); + children.PushBack(container, document->GetAllocator()); + } + value->AddMember("children", children, document->GetAllocator()); +} + +// Helper method which converts a list of plan fragments into a single JSON document, with +// the following schema: +// "plan_nodes": [ +// { +// "label": "12:AGGREGATE", +// "label_detail": "FINALIZE", +// "output_card": 23456, +// "num_instances": 34, +// "max_time": "1m23s", +// "avg_time": "1.3ms", +// "children": [ +// { +// "label": "11:EXCHANGE", +// "label_detail": "UNPARTITIONED", +// "children": [] +// } +// ] +// }, +// { +// "label": "07:AGGREGATE", +// "label_detail": "", +// "children": [], +// "data_stream_target": "11:EXCHANGE" +// } +// ] +void PlanToJson(const vector<TPlanFragment>& fragments, const TExecSummary& summary, + rapidjson::Document* document, Value* value) { + // Build a map from id to label so that we can resolve the targets of data-stream sinks + // and connect plan fragments. + map<TPlanNodeId, string> label_map; + for (const TPlanFragment& fragment: fragments) { + for (const TPlanNode& node: fragment.plan.nodes) { + label_map[node.node_id] = node.label; + } + } + + map<TPlanNodeId, TPlanNodeExecSummary> exec_summaries; + for (const TPlanNodeExecSummary& s: summary.nodes) { + exec_summaries[s.node_id] = s; + } + + Value nodes(kArrayType); + for (const TPlanFragment& fragment: fragments) { + Value plan_fragment(kObjectType); + vector<TPlanNode>::const_iterator it = fragment.plan.nodes.begin(); + PlanToJsonHelper(exec_summaries, fragment.plan.nodes, &it, document, &plan_fragment); + if (fragment.__isset.output_sink) { + const TDataSink& sink = fragment.output_sink; + if (sink.__isset.stream_sink) { + plan_fragment.AddMember("data_stream_target", + label_map[sink.stream_sink.dest_node_id].c_str(), document->GetAllocator()); + } + } + nodes.PushBack(plan_fragment, document->GetAllocator()); + } + value->AddMember("plan_nodes", nodes, document->GetAllocator()); +} + +} + +void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include_summary, + const Webserver::ArgumentMap& args, Document* document) { + TUniqueId query_id; + Status status = ParseQueryId(args, &query_id); + if (!status.ok()) { + // Redact the error message, it may contain part or all of the query. + Value json_error(RedactCopy(status.GetDetail()).c_str(), document->GetAllocator()); + document->AddMember("error", json_error, document->GetAllocator()); + return; + } + + TExecSummary summary; + string stmt; + string plan; + Status query_status; + bool found = false; + vector<TPlanFragment> fragments; + + // Search the in-flight queries first, followed by the archived ones. + { + shared_ptr<ImpalaServer::QueryExecState> exec_state = + server_->GetQueryExecState(query_id, true); + if (exec_state != NULL) { + found = true; + lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); + if (exec_state->coord() == NULL) { + const string& err = Substitute("Invalid query id: $0", PrintId(query_id)); + Value json_error(err.c_str(), document->GetAllocator()); + document->AddMember("error", json_error, document->GetAllocator()); + return; + } + query_status = exec_state->query_status(); + stmt = exec_state->sql_stmt(); + plan = exec_state->exec_request().query_exec_request.query_plan; + if (include_json_plan || include_summary) { + lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock()); + summary = exec_state->coord()->exec_summary(); + } + if (include_json_plan) { + fragments = exec_state->exec_request().query_exec_request.fragments; + } + } + } + + if (!found) { + lock_guard<mutex> l(server_->query_log_lock_); + ImpalaServer::QueryLogIndex::const_iterator query_record = + server_->query_log_index_.find(query_id); + if (query_record == server_->query_log_index_.end()) { + const string& err = Substitute("Unknown query id: $0", PrintId(query_id)); + Value json_error(err.c_str(), document->GetAllocator()); + document->AddMember("error", json_error, document->GetAllocator()); + return; + } + if (include_json_plan || include_summary) { + summary = query_record->second->exec_summary; + } + stmt = query_record->second->stmt; + plan = query_record->second->plan; + query_status = query_record->second->query_status; + if (include_json_plan) { + fragments = query_record->second->fragments; + } + } + + if (include_json_plan) { + Value v(kObjectType); + PlanToJson(fragments, summary, document, &v); + document->AddMember("plan_json", v, document->GetAllocator()); + } + if (include_summary) { + const string& printed_summary = PrintExecSummary(summary); + Value json_summary(printed_summary.c_str(), document->GetAllocator()); + document->AddMember("summary", json_summary, document->GetAllocator()); + } + Value json_stmt(RedactCopy(stmt).c_str(), document->GetAllocator()); + document->AddMember("stmt", json_stmt, document->GetAllocator()); + Value json_plan_text(RedactCopy(plan).c_str(), document->GetAllocator()); + document->AddMember("plan", json_plan_text, document->GetAllocator()); + + // Redact the error in case the query is contained in the error message. + Value json_status(query_status.ok() ? "OK" : + RedactCopy(query_status.GetDetail()).c_str(), document->GetAllocator()); + document->AddMember("status", json_status, document->GetAllocator()); + Value json_id(PrintId(query_id).c_str(), document->GetAllocator()); + document->AddMember("query_id", json_id, document->GetAllocator()); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1af2455/be/src/service/impala-http-handler.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h new file mode 100644 index 0000000..e5c41f9 --- /dev/null +++ b/be/src/service/impala-http-handler.h @@ -0,0 +1,140 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_SERVICE_IMPALA_HTTP_HANDLER_H +#define IMPALA_SERVICE_IMPALA_HTTP_HANDLER_H + +#include <rapidjson/document.h> +#include "util/webserver.h" + +#include "service/impala-server.h" + +namespace impala { + +/// Handles all webserver callbacks for an ImpalaServer. This class is a friend of +/// ImpalaServer in order to access the internal state needed to generate the debug +/// webpages. +class ImpalaHttpHandler { + public: + ImpalaHttpHandler(ImpalaServer* server) : server_(server) { } + + /// Registers all the per-Impalad webserver callbacks + void RegisterHandlers(Webserver* webserver); + + private: + ImpalaServer* server_; + + /// Json callback for /hadoop-varz. Produces Json with a list, 'configs', of (key, + /// value) pairs, one for each Hadoop configuration value. + void HadoopVarzHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* document); + + /// Returns two sorted lists of queries, one in-flight and one completed, as well as a + /// list of active backends and their plan-fragment count. + // + /// "in_flight_queries": [], + /// "num_in_flight_queries": 0, + /// "completed_queries": [ + /// { + /// "effective_user": "henry", + /// "default_db": "default", + /// "stmt": "select sleep(10000)", + /// "stmt_type": "QUERY", + /// "start_time": "2014-08-07 18:37:47.923614000", + /// "end_time": "2014-08-07 18:37:58.146494000", + /// "progress": "0 / 0 (0%)", + /// "state": "FINISHED", + /// "rows_fetched": 1, + /// "query_id": "7c459a59fb8cefe3:8b7042d55bf19887" + /// } + /// ], + /// "completed_log_size": 25, + /// "query_locations": [ + /// { + /// "location": "henry-impala:22000", + /// "count": 0 + /// } + /// ] + void QueryStateHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* document); + + /// Json callback for /query_profile. Expects query_id as an argument, produces Json + /// with 'profile' set to the profile string, and 'query_id' set to the query ID. + void QueryProfileHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* document); + + /// Webserver callback. Produces a Json structure with query summary information. + /// Example: + /// { "summary": <....>, + /// "plan": <....>, + /// "stmt": "select count(*) from functional.alltypes" + /// "id": <...>, + /// "state": "FINISHED" + /// } + /// If include_plan_json is true, 'plan_json' will be set to a JSON representation of + /// the query plan. If include_summary is true, 'summary' will be a text rendering of + /// the query summary. + void QuerySummaryHandler(bool include_plan_json, bool include_summary, + const Webserver::ArgumentMap& args, rapidjson::Document* document); + + /// Cancels an in-flight query and writes the result to 'contents'. + void CancelQueryHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* document); + + /// Upon return, 'document' will contain the query profile as a base64 encoded object in + /// 'contents'. + void QueryProfileEncodedHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* document); + + /// Produces a list of inflight query IDs printed as text in 'contents'. + void InflightQueryIdsHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* document); + + /// Json callback for /sessions, which prints a table of active client sessions. + /// "sessions": [ + /// { + /// "type": "BEESWAX", + /// "num_queries": 0, + /// "user": "", + /// "delegated_user": "", + /// "session_id": "6242f69b02e4d609:ac84df1fbb0e16a3", + /// "network_address": "127.0.0.1:46484", + /// "default_database": "default", + /// "start_time": "2014-08-07 22:50:49", + /// "last_accessed": "2014-08-07 22:50:49", + /// "expired": false, + /// "closed": false, + /// "ref_count": 0 + /// } + /// ], + /// "num_sessions": 1 + void SessionsHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* document); + + /// Returns a list of all known databases and tables + void CatalogHandler(const Webserver::ArgumentMap& args, rapidjson::Document* output); + + /// Returns information on objects in the catalog. + void CatalogObjectsHandler(const Webserver::ArgumentMap& args, + rapidjson::Document* output); + + /// Helper method to render a single QueryStateRecord as a Json object Used by + /// QueryStateHandler(). + void QueryStateToJson(const ImpalaServer::QueryStateRecord& record, + rapidjson::Value* value, rapidjson::Document* document); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1af2455/be/src/service/impala-server-callbacks.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server-callbacks.cc b/be/src/service/impala-server-callbacks.cc deleted file mode 100644 index 88d4fa0..0000000 --- a/be/src/service/impala-server-callbacks.cc +++ /dev/null @@ -1,690 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "service/impala-server.h" - -#include <sstream> -#include <boost/thread/mutex.hpp> -#include <gutil/strings/substitute.h> - -#include "catalog/catalog-util.h" -#include "service/query-exec-state.h" -#include "util/webserver.h" - -#include "gen-cpp/beeswax_types.h" -#include "thrift/protocol/TDebugProtocol.h" -#include "util/redactor.h" -#include "util/summary-util.h" -#include "util/time.h" -#include "util/url-coding.h" - -#include "common/names.h" - -using boost::adopt_lock_t; -using namespace apache::thrift; -using namespace beeswax; -using namespace impala; -using namespace rapidjson; -using namespace strings; - -DECLARE_int32(query_log_size); - -void ImpalaServer::RegisterWebserverCallbacks(Webserver* webserver) { - DCHECK(webserver != NULL); - - Webserver::UrlCallback hadoop_varz_callback = - bind<void>(mem_fn(&ImpalaServer::HadoopVarzUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/hadoop-varz", "hadoop-varz.tmpl", - hadoop_varz_callback); - - Webserver::UrlCallback query_json_callback = - bind<void>(mem_fn(&ImpalaServer::QueryStateUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/queries", "queries.tmpl", - query_json_callback); - - Webserver::UrlCallback sessions_json_callback = - bind<void>(mem_fn(&ImpalaServer::SessionsUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/sessions", "sessions.tmpl", - sessions_json_callback); - - Webserver::UrlCallback catalog_callback = - bind<void>(mem_fn(&ImpalaServer::CatalogUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/catalog", "catalog.tmpl", - catalog_callback); - - Webserver::UrlCallback catalog_objects_callback = - bind<void>(mem_fn(&ImpalaServer::CatalogObjectsUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/catalog_object", "catalog_object.tmpl", - catalog_objects_callback, false); - - Webserver::UrlCallback profile_callback = - bind<void>(mem_fn(&ImpalaServer::QueryProfileUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/query_profile", "query_profile.tmpl", - profile_callback, false); - - Webserver::UrlCallback cancel_callback = - bind<void>(mem_fn(&ImpalaServer::CancelQueryUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/cancel_query", "common-pre.tmpl", cancel_callback, - false); - - Webserver::UrlCallback profile_encoded_callback = - bind<void>(mem_fn(&ImpalaServer::QueryProfileEncodedUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/query_profile_encoded", "raw_text.tmpl", - profile_encoded_callback, false); - - Webserver::UrlCallback inflight_query_ids_callback = - bind<void>(mem_fn(&ImpalaServer::InflightQueryIdsUrlCallback), this, _1, _2); - webserver->RegisterUrlCallback("/inflight_query_ids", "raw_text.tmpl", - inflight_query_ids_callback, false); - - Webserver::UrlCallback query_summary_callback = - bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback), this, false, true, _1, _2); - webserver->RegisterUrlCallback("/query_summary", "query_summary.tmpl", - query_summary_callback, false); - - Webserver::UrlCallback query_plan_callback = - bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback), this, true, true, _1, _2); - webserver->RegisterUrlCallback("/query_plan", "query_plan.tmpl", - query_plan_callback, false); - - Webserver::UrlCallback query_plan_text_callback = - bind<void>(mem_fn(&ImpalaServer::QuerySummaryCallback), this, false, false, _1, _2); - webserver->RegisterUrlCallback("/query_plan_text", "query_plan_text.tmpl", - query_plan_text_callback, false); - webserver->RegisterUrlCallback("/query_stmt", "query_stmt.tmpl", - query_plan_text_callback, false); -} - -void ImpalaServer::HadoopVarzUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - TGetAllHadoopConfigsResponse response; - Status status = exec_env_->frontend()->GetAllHadoopConfigs(&response); - if (!status.ok()) return; - - Value configs(kArrayType); - typedef map<string, string> ConfigMap; - for (const ConfigMap::value_type& config: response.configs) { - Value key(config.first.c_str(), document->GetAllocator()); - Value value(config.second.c_str(), document->GetAllocator()); - Value config_json(kObjectType); - config_json.AddMember("key", key, document->GetAllocator()); - config_json.AddMember("value", value, document->GetAllocator()); - configs.PushBack(config_json, document->GetAllocator()); - } - document->AddMember("configs", configs, document->GetAllocator()); -} - -// We expect the query id to be passed as one parameter, 'query_id'. -// Returns true if the query id was present and valid; false otherwise. -static Status ParseQueryId(const Webserver::ArgumentMap& args, TUniqueId* id) { - Webserver::ArgumentMap::const_iterator it = args.find("query_id"); - if (it == args.end()) { - return Status("No 'query_id' argument found"); - } else { - if (ParseId(it->second, id)) return Status::OK(); - return Status(Substitute("Could not parse 'query_id' argument: $0", it->second)); - } -} - -void ImpalaServer::CancelQueryUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - TUniqueId unique_id; - Status status = ParseQueryId(args, &unique_id); - if (!status.ok()) { - Value error(status.GetDetail().c_str(), document->GetAllocator()); - document->AddMember("error", error, document->GetAllocator()); - return; - } - Status cause("Cancelled from Impala's debug web interface"); - status = UnregisterQuery(unique_id, true, &cause); - if (!status.ok()) { - Value error(status.GetDetail().c_str(), document->GetAllocator()); - document->AddMember("error", error, document->GetAllocator()); - return; - } - Value message("Query cancellation successful", document->GetAllocator()); - document->AddMember("contents", message, document->GetAllocator()); -} - -void ImpalaServer::QueryProfileUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - TUniqueId unique_id; - Status parse_status = ParseQueryId(args, &unique_id); - if (!parse_status.ok()) { - Value error(parse_status.GetDetail().c_str(), document->GetAllocator()); - document->AddMember("error", error, document->GetAllocator()); - return; - } - - stringstream ss; - Status status = GetRuntimeProfileStr(unique_id, false, &ss); - if (!status.ok()) { - Value error(status.GetDetail().c_str(), document->GetAllocator()); - document->AddMember("error", error, document->GetAllocator()); - return; - } - - Value profile(ss.str().c_str(), document->GetAllocator()); - document->AddMember("profile", profile, document->GetAllocator()); - document->AddMember("query_id", args.find("query_id")->second.c_str(), - document->GetAllocator()); -} - -void ImpalaServer::QueryProfileEncodedUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - TUniqueId unique_id; - stringstream ss; - Status status = ParseQueryId(args, &unique_id); - if (!status.ok()) { - ss << status.GetDetail(); - } else { - Status status = GetRuntimeProfileStr(unique_id, true, &ss); - if (!status.ok()) { - ss.str(Substitute("Could not obtain runtime profile: $0", status.GetDetail())); - } - } - - document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator()); - Value profile(ss.str().c_str(), document->GetAllocator()); - document->AddMember("contents", profile, document->GetAllocator()); -} - -void ImpalaServer::InflightQueryIdsUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - lock_guard<mutex> l(query_exec_state_map_lock_); - stringstream ss; - for (const QueryExecStateMap::value_type& exec_state: query_exec_state_map_) { - ss << exec_state.second->query_id() << "\n"; - } - document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator()); - Value query_ids(ss.str().c_str(), document->GetAllocator()); - document->AddMember("contents", query_ids, document->GetAllocator()); -} - -void ImpalaServer::QueryStateToJson(const ImpalaServer::QueryStateRecord& record, - Value* value, Document* document) { - Value user(record.effective_user.c_str(), document->GetAllocator()); - value->AddMember("effective_user", user, document->GetAllocator()); - - Value default_db(record.default_db.c_str(), document->GetAllocator()); - value->AddMember("default_db", default_db, document->GetAllocator()); - - // Redact the query string - Value stmt(RedactCopy(record.stmt).c_str(), document->GetAllocator()); - value->AddMember("stmt", stmt, document->GetAllocator()); - - Value stmt_type(_TStmtType_VALUES_TO_NAMES.find(record.stmt_type)->second, - document->GetAllocator()); - value->AddMember("stmt_type", stmt_type, document->GetAllocator()); - - Value start_time(record.start_time.DebugString().c_str(), document->GetAllocator()); - value->AddMember("start_time", start_time, document->GetAllocator()); - - Value end_time(record.end_time.DebugString().c_str(), document->GetAllocator()); - value->AddMember("end_time", end_time, document->GetAllocator()); - - const TimestampValue& end_timestamp = - record.end_time.HasDate() ? record.end_time : TimestampValue::LocalTime(); - double ut_end_time, ut_start_time; - double duration = 0.0; - if (LIKELY(end_timestamp.ToSubsecondUnixTime(&ut_end_time)) - && LIKELY(record.start_time.ToSubsecondUnixTime(&ut_start_time))) { - duration = ut_end_time - ut_start_time; - } - const string& printed_duration = PrettyPrinter::Print(duration, TUnit::TIME_S); - Value val_duration(printed_duration.c_str(), document->GetAllocator()); - value->AddMember("duration", val_duration, document->GetAllocator()); - - string progress = "N/A"; - if (record.has_coord) { - stringstream ss; - ss << record.num_complete_fragments << " / " << record.total_fragments - << " (" << setw(4); - if (record.total_fragments == 0) { - ss << "0%)"; - } else { - ss << (100.0 * record.num_complete_fragments / (1.f * record.total_fragments)) - << "%)"; - } - progress = ss.str(); - } - Value progress_json(progress.c_str(), document->GetAllocator()); - value->AddMember("progress", progress_json, document->GetAllocator()); - - Value state(_QueryState_VALUES_TO_NAMES.find(record.query_state)->second, - document->GetAllocator()); - value->AddMember("state", state, document->GetAllocator()); - - value->AddMember("rows_fetched", record.num_rows_fetched, document->GetAllocator()); - - Value query_id(PrintId(record.id).c_str(), document->GetAllocator()); - value->AddMember("query_id", query_id, document->GetAllocator()); - - if (record.event_sequence.labels.size() > 0) { - Value last_event(record.event_sequence.labels.back().c_str(), - document->GetAllocator()); - value->AddMember("last_event", last_event, document->GetAllocator()); - } - - // Waiting to be closed. - bool waiting = record.query_state == beeswax::QueryState::EXCEPTION || - record.all_rows_returned; - value->AddMember("waiting", waiting, document->GetAllocator()); - value->AddMember("executing", !waiting, document->GetAllocator()); - - int64_t waiting_time = impala::UnixMillis() - record.last_active_time; - string waiting_time_str = ""; - if (waiting_time > 0) { - waiting_time_str = PrettyPrinter::Print(waiting_time, TUnit::TIME_MS); - } - Value val_waiting_time(waiting_time_str.c_str(), document->GetAllocator()); - value->AddMember("waiting_time", val_waiting_time, document->GetAllocator()); -} - -void ImpalaServer::QueryStateUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - set<QueryStateRecord, QueryStateRecordLessThan> sorted_query_records; - { - lock_guard<mutex> l(query_exec_state_map_lock_); - for(const QueryExecStateMap::value_type& exec_state: query_exec_state_map_) { - // TODO: Do this in the browser so that sorts on other keys are possible. - sorted_query_records.insert(QueryStateRecord(*exec_state.second)); - } - } - - Value in_flight_queries(kArrayType); - int64_t num_waiting_queries = 0; - for (const QueryStateRecord& record: sorted_query_records) { - Value record_json(kObjectType); - QueryStateToJson(record, &record_json, document); - - if (record_json["waiting"].GetBool()) ++num_waiting_queries; - - in_flight_queries.PushBack(record_json, document->GetAllocator()); - } - document->AddMember("in_flight_queries", in_flight_queries, document->GetAllocator()); - document->AddMember("num_in_flight_queries", - static_cast<uint64_t>(sorted_query_records.size()), - document->GetAllocator()); - document->AddMember("num_executing_queries", - sorted_query_records.size() - num_waiting_queries, - document->GetAllocator()); - document->AddMember("num_waiting_queries", num_waiting_queries, - document->GetAllocator()); - document->AddMember("waiting-tooltip", "These queries are no longer executing, either " - "because they encountered an error or because they have returned all of their " - "results, but they are still active so that their results can be inspected. To " - "free the resources they are using, they must be closed.", - document->GetAllocator()); - - Value completed_queries(kArrayType); - { - lock_guard<mutex> l(query_log_lock_); - for (const QueryStateRecord& log_entry: query_log_) { - Value record_json(kObjectType); - QueryStateToJson(log_entry, &record_json, document); - completed_queries.PushBack(record_json, document->GetAllocator()); - } - } - document->AddMember("completed_queries", completed_queries, document->GetAllocator()); - document->AddMember("completed_log_size", FLAGS_query_log_size, - document->GetAllocator()); - - Value query_locations(kArrayType); - { - lock_guard<mutex> l(query_locations_lock_); - for (const QueryLocations::value_type& location: query_locations_) { - Value location_json(kObjectType); - Value location_name(lexical_cast<string>(location.first).c_str(), - document->GetAllocator()); - location_json.AddMember("location", location_name, document->GetAllocator()); - location_json.AddMember("count", static_cast<uint64_t>(location.second.size()), - document->GetAllocator()); - query_locations.PushBack(location_json, document->GetAllocator()); - } - } - document->AddMember("query_locations", query_locations, document->GetAllocator()); -} - - -void ImpalaServer::SessionsUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - lock_guard<mutex> l(session_state_map_lock_); - Value sessions(kArrayType); - for (const SessionStateMap::value_type& session: session_state_map_) { - shared_ptr<SessionState> state = session.second; - Value session_json(kObjectType); - Value type(PrintTSessionType(state->session_type).c_str(), - document->GetAllocator()); - session_json.AddMember("type", type, document->GetAllocator()); - - session_json.AddMember("inflight_queries", - static_cast<uint64_t>(state->inflight_queries.size()), - document->GetAllocator()); - session_json.AddMember("total_queries", state->total_queries, - document->GetAllocator()); - - Value user(state->connected_user.c_str(), document->GetAllocator()); - session_json.AddMember("user", user, document->GetAllocator()); - - Value delegated_user(state->do_as_user.c_str(), document->GetAllocator()); - session_json.AddMember("delegated_user", delegated_user, document->GetAllocator()); - - Value session_id(PrintId(session.first).c_str(), document->GetAllocator()); - session_json.AddMember("session_id", session_id, document->GetAllocator()); - - Value network_address(lexical_cast<string>(state->network_address).c_str(), - document->GetAllocator()); - session_json.AddMember("network_address", network_address, document->GetAllocator()); - - Value default_db(state->database.c_str(), document->GetAllocator()); - session_json.AddMember("default_database", default_db, document->GetAllocator()); - - Value start_time(state->start_time.DebugString().c_str(), document->GetAllocator()); - session_json.AddMember("start_time", start_time, document->GetAllocator()); - - Value last_accessed( - TimestampValue(session.second->last_accessed_ms / 1000).DebugString().c_str(), - document->GetAllocator()); - session_json.AddMember("last_accessed", last_accessed, document->GetAllocator()); - - session_json.AddMember("session_timeout", state->session_timeout, - document->GetAllocator()); - session_json.AddMember("expired", state->expired, document->GetAllocator()); - session_json.AddMember("closed", state->closed, document->GetAllocator()); - session_json.AddMember("ref_count", state->ref_count, document->GetAllocator()); - sessions.PushBack(session_json, document->GetAllocator()); - } - - document->AddMember("sessions", sessions, document->GetAllocator()); - document->AddMember("num_sessions", static_cast<uint64_t>(session_state_map_.size()), - document->GetAllocator()); -} - -void ImpalaServer::CatalogUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - TGetDbsResult get_dbs_result; - Status status = exec_env_->frontend()->GetDbs(NULL, NULL, &get_dbs_result); - if (!status.ok()) { - Value error(status.GetDetail().c_str(), document->GetAllocator()); - document->AddMember("error", error, document->GetAllocator()); - return; - } - - Value databases(kArrayType); - for (const TDatabase& db: get_dbs_result.dbs) { - Value database(kObjectType); - Value str(db.db_name.c_str(), document->GetAllocator()); - database.AddMember("name", str, document->GetAllocator()); - - TGetTablesResult get_table_results; - Status status = - exec_env_->frontend()->GetTableNames(db.db_name, NULL, NULL, &get_table_results); - if (!status.ok()) { - Value error(status.GetDetail().c_str(), document->GetAllocator()); - database.AddMember("error", error, document->GetAllocator()); - continue; - } - - Value table_array(kArrayType); - for (const string& table: get_table_results.tables) { - Value table_obj(kObjectType); - Value fq_name(Substitute("$0.$1", db.db_name, table).c_str(), - document->GetAllocator()); - table_obj.AddMember("fqtn", fq_name, document->GetAllocator()); - Value table_name(table.c_str(), document->GetAllocator()); - table_obj.AddMember("name", table_name, document->GetAllocator()); - table_array.PushBack(table_obj, document->GetAllocator()); - } - database.AddMember("num_tables", table_array.Size(), document->GetAllocator()); - database.AddMember("tables", table_array, document->GetAllocator()); - databases.PushBack(database, document->GetAllocator()); - } - document->AddMember("databases", databases, document->GetAllocator()); -} - -void ImpalaServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args, - Document* document) { - Webserver::ArgumentMap::const_iterator object_type_arg = args.find("object_type"); - Webserver::ArgumentMap::const_iterator object_name_arg = args.find("object_name"); - if (object_type_arg != args.end() && object_name_arg != args.end()) { - TCatalogObjectType::type object_type = - TCatalogObjectTypeFromName(object_type_arg->second); - - // Get the object type and name from the topic entry key - TCatalogObject request; - TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request); - - // Get the object and dump its contents. - TCatalogObject result; - Status status = exec_env_->frontend()->GetCatalogObject(request, &result); - if (status.ok()) { - Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator()); - document->AddMember("thrift_string", debug_string, document->GetAllocator()); - } else { - Value error(status.GetDetail().c_str(), document->GetAllocator()); - document->AddMember("error", error, document->GetAllocator()); - } - } else { - Value error("Please specify values for the object_type and object_name parameters.", - document->GetAllocator()); - document->AddMember("error", error, document->GetAllocator()); - } -} - -// Helper for PlanToJson(), processes a single list of plan nodes which are the -// DFS-flattened representation of a single plan fragment. Called recursively, the -// iterator parameter is updated in place so that when a recursive call returns, the -// caller is pointing at the next of its children. -void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries, - const vector<TPlanNode>& nodes, - vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) { - Value children(kArrayType); - value->AddMember("label", (*it)->label.c_str(), document->GetAllocator()); - // Node "details" may contain exprs which should be redacted. - Value label_detail(RedactCopy((*it)->label_detail).c_str(), document->GetAllocator()); - value->AddMember("label_detail", label_detail, document->GetAllocator()); - - TPlanNodeId id = (*it)->node_id; - map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary = summaries.find(id); - if (summary != summaries.end()) { - int64_t cardinality = 0; - int64_t max_time = 0L; - int64_t total_time = 0; - for (const TExecStats& stat: summary->second.exec_stats) { - if (summary->second.is_broadcast) { - // Avoid multiple-counting for recipients of broadcasts. - cardinality = ::max(cardinality, stat.cardinality); - } else { - cardinality += stat.cardinality; - } - total_time += stat.latency_ns; - max_time = ::max(max_time, stat.latency_ns); - } - value->AddMember("output_card", cardinality, document->GetAllocator()); - value->AddMember("num_instances", - static_cast<uint64_t>(summary->second.exec_stats.size()), - document->GetAllocator()); - if (summary->second.is_broadcast) { - value->AddMember("is_broadcast", true, document->GetAllocator()); - } - - const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS); - Value max_time_str_json(max_time_str.c_str(), document->GetAllocator()); - value->AddMember("max_time", max_time_str_json, document->GetAllocator()); - value->AddMember("max_time_val", max_time, document->GetAllocator()); - - // Round to the nearest ns, to workaround a bug in pretty-printing a fraction of a - // ns. See IMPALA-1800. - const string& avg_time_str = PrettyPrinter::Print( - // A bug may occasionally cause 1-instance nodes to appear to have 0 instances. - total_time / ::max(static_cast<int>(summary->second.exec_stats.size()), 1), - TUnit::TIME_NS); - Value avg_time_str_json(avg_time_str.c_str(), document->GetAllocator()); - value->AddMember("avg_time", avg_time_str_json, document->GetAllocator()); - } - - int num_children = (*it)->num_children; - for (int i = 0; i < num_children; ++i) { - ++(*it); - Value container(kObjectType); - PlanToJsonHelper(summaries, nodes, it, document, &container); - children.PushBack(container, document->GetAllocator()); - } - value->AddMember("children", children, document->GetAllocator()); -} - -// Helper method which converts a list of plan fragments into a single JSON document, with -// the following schema: -// "plan_nodes": [ -// { -// "label": "12:AGGREGATE", -// "label_detail": "FINALIZE", -// "output_card": 23456, -// "num_instances": 34, -// "max_time": "1m23s", -// "avg_time": "1.3ms", -// "children": [ -// { -// "label": "11:EXCHANGE", -// "label_detail": "UNPARTITIONED", -// "children": [] -// } -// ] -// }, -// { -// "label": "07:AGGREGATE", -// "label_detail": "", -// "children": [], -// "data_stream_target": "11:EXCHANGE" -// } -// ] -void PlanToJson(const vector<TPlanFragment>& fragments, const TExecSummary& summary, - rapidjson::Document* document, Value* value) { - // Build a map from id to label so that we can resolve the targets of data-stream sinks - // and connect plan fragments. - map<TPlanNodeId, string> label_map; - for (const TPlanFragment& fragment: fragments) { - for (const TPlanNode& node: fragment.plan.nodes) { - label_map[node.node_id] = node.label; - } - } - - map<TPlanNodeId, TPlanNodeExecSummary> exec_summaries; - for (const TPlanNodeExecSummary& s: summary.nodes) { - exec_summaries[s.node_id] = s; - } - - Value nodes(kArrayType); - for (const TPlanFragment& fragment: fragments) { - Value plan_fragment(kObjectType); - vector<TPlanNode>::const_iterator it = fragment.plan.nodes.begin(); - PlanToJsonHelper(exec_summaries, fragment.plan.nodes, &it, document, &plan_fragment); - if (fragment.__isset.output_sink) { - const TDataSink& sink = fragment.output_sink; - if (sink.__isset.stream_sink) { - plan_fragment.AddMember("data_stream_target", - label_map[sink.stream_sink.dest_node_id].c_str(), document->GetAllocator()); - } - } - nodes.PushBack(plan_fragment, document->GetAllocator()); - } - value->AddMember("plan_nodes", nodes, document->GetAllocator()); -} - -void ImpalaServer::QuerySummaryCallback(bool include_json_plan, bool include_summary, - const Webserver::ArgumentMap& args, Document* document) { - TUniqueId query_id; - Status status = ParseQueryId(args, &query_id); - if (!status.ok()) { - // Redact the error message, it may contain part or all of the query. - Value json_error(RedactCopy(status.GetDetail()).c_str(), document->GetAllocator()); - document->AddMember("error", json_error, document->GetAllocator()); - return; - } - - TExecSummary summary; - string stmt; - string plan; - Status query_status; - bool found = false; - vector<TPlanFragment> fragments; - - // Search the in-flight queries first, followed by the archived ones. - { - shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true); - if (exec_state != NULL) { - found = true; - lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); - if (exec_state->coord() == NULL) { - const string& err = Substitute("Invalid query id: $0", PrintId(query_id)); - Value json_error(err.c_str(), document->GetAllocator()); - document->AddMember("error", json_error, document->GetAllocator()); - return; - } - query_status = exec_state->query_status(); - stmt = exec_state->sql_stmt(); - plan = exec_state->exec_request().query_exec_request.query_plan; - if (include_json_plan || include_summary) { - lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock()); - summary = exec_state->coord()->exec_summary(); - } - if (include_json_plan) { - fragments = exec_state->exec_request().query_exec_request.fragments; - } - } - } - - if (!found) { - lock_guard<mutex> l(query_log_lock_); - QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id); - if (query_record == query_log_index_.end()) { - const string& err = Substitute("Unknown query id: $0", PrintId(query_id)); - Value json_error(err.c_str(), document->GetAllocator()); - document->AddMember("error", json_error, document->GetAllocator()); - return; - } - if (include_json_plan || include_summary) { - summary = query_record->second->exec_summary; - } - stmt = query_record->second->stmt; - plan = query_record->second->plan; - query_status = query_record->second->query_status; - if (include_json_plan) { - fragments = query_record->second->fragments; - } - } - - if (include_json_plan) { - Value v(kObjectType); - PlanToJson(fragments, summary, document, &v); - document->AddMember("plan_json", v, document->GetAllocator()); - } - if (include_summary) { - const string& printed_summary = PrintExecSummary(summary); - Value json_summary(printed_summary.c_str(), document->GetAllocator()); - document->AddMember("summary", json_summary, document->GetAllocator()); - } - Value json_stmt(RedactCopy(stmt).c_str(), document->GetAllocator()); - document->AddMember("stmt", json_stmt, document->GetAllocator()); - Value json_plan_text(RedactCopy(plan).c_str(), document->GetAllocator()); - document->AddMember("plan", json_plan_text, document->GetAllocator()); - - // Redact the error in case the query is contained in the error message. - Value json_status(query_status.ok() ? "OK" : - RedactCopy(query_status.GetDetail()).c_str(), document->GetAllocator()); - document->AddMember("status", json_status, document->GetAllocator()); - Value json_id(PrintId(query_id).c_str(), document->GetAllocator()); - document->AddMember("query_id", json_id, document->GetAllocator()); -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1af2455/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 8f62171..771d5b0 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -53,6 +53,7 @@ #include "runtime/tmp-file-mgr.h" #include "service/fragment-exec-state.h" #include "service/impala-internal-service.h" +#include "service/impala-http-handler.h" #include "service/query-exec-state.h" #include "scheduling/simple-scheduler.h" #include "util/bit-util.h" @@ -312,7 +313,8 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) ERR_load_crypto_strings(); } - RegisterWebserverCallbacks(exec_env->webserver()); + http_handler_.reset(new ImpalaHttpHandler(this)); + http_handler_->RegisterHandlers(exec_env->webserver()); // Initialize impalad metrics ImpaladMetrics::CreateMetrics(exec_env->metrics()->GetChildGroup("impala-server")); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1af2455/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index ce30725..6a07ccd 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -42,7 +42,6 @@ #include "runtime/runtime-state.h" #include "runtime/timestamp-value.h" #include "runtime/types.h" -#include "rapidjson/rapidjson.h" namespace impala { @@ -50,6 +49,7 @@ class ExecEnv; class DataSink; class CancellationWork; class Coordinator; +class ImpalaHttpHandler; class RowDescriptor; class TCatalogUpdate; class TPlanExecRequest; @@ -249,6 +249,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, private: friend class ChildQuery; + friend class ImpalaHttpHandler; + + boost::scoped_ptr<ImpalaHttpHandler> http_handler_; /// Query result set stores converted rows returned by QueryExecState.fetchRows(). It /// provides an interface to convert Impala rows to external API rows. @@ -393,109 +396,11 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Returns the exec summary for this query. Status GetExecSummary(const TUniqueId& query_id, TExecSummary* result); - /// Json callback for /hadoop-varz. Produces Json with a list, 'configs', of (key, value) - /// pairs, one for each Hadoop configuration value. - void HadoopVarzUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* document); - - /// Webserver callback. Returns two sorted lists of queries, one in-flight and one - /// completed, as well as a list of active backends and their plan-fragment count. - // - /// "in_flight_queries": [], - /// "num_in_flight_queries": 0, - /// "completed_queries": [ - /// { - /// "effective_user": "henry", - /// "default_db": "default", - /// "stmt": "select sleep(10000)", - /// "stmt_type": "QUERY", - /// "start_time": "2014-08-07 18:37:47.923614000", - /// "end_time": "2014-08-07 18:37:58.146494000", - /// "progress": "0 / 0 (0%)", - /// "state": "FINISHED", - /// "rows_fetched": 1, - /// "query_id": "7c459a59fb8cefe3:8b7042d55bf19887" - /// } - /// ], - /// "completed_log_size": 25, - /// "query_locations": [ - /// { - /// "location": "henry-impala:22000", - /// "count": 0 - /// } - /// ] - void QueryStateUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* document); - - /// Json callback for /query_profile. Expects query_id as an argument, produces Json with - /// 'profile' set to the profile string, and 'query_id' set to the query ID. - void QueryProfileUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* document); - - /// Webserver callback. Produces a Json structure with query summary information. - /// Example: - /// { "summary": <....>, - /// "plan": <....>, - /// "stmt": "select count(*) from functional.alltypes" - /// "id": <...>, - /// "state": "FINISHED" - /// } - /// If include_plan_json is true, 'plan_json' will be set to a JSON representation of the - /// query plan. If include_summary is true, 'summary' will be a text rendering of the - /// query summary. - void QuerySummaryCallback(bool include_plan_json, bool include_summary, - const Webserver::ArgumentMap& args, rapidjson::Document* document); - - /// Webserver callback. Cancels an in-flight query and writes the result to 'contents'. - void CancelQueryUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* document); - - /// Webserver callback. Upon return, 'document' will contain the query profile as a - /// base64 encoded object in 'contents'. - void QueryProfileEncodedUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* document); - - /// Webserver callback. Produces a list of inflight query IDs printed as text in - /// 'contents'. - void InflightQueryIdsUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* document); - - /// Json callback for /sessions, which prints a table of active client sessions. - /// "sessions": [ - /// { - /// "type": "BEESWAX", - /// "num_queries": 0, - /// "user": "", - /// "delegated_user": "", - /// "session_id": "6242f69b02e4d609:ac84df1fbb0e16a3", - /// "network_address": "127.0.0.1:46484", - /// "default_database": "default", - /// "start_time": "2014-08-07 22:50:49", - /// "last_accessed": "2014-08-07 22:50:49", - /// "expired": false, - /// "closed": false, - /// "ref_count": 0 - /// } - /// ], - /// "num_sessions": 1 - void SessionsUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* document); - - /// Webserver callback that prints a list of all known databases and tables - void CatalogUrlCallback(const Webserver::ArgumentMap& args, rapidjson::Document* output); - - /// Webserver callback that allows for dumping information on objects in the catalog. - void CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args, - rapidjson::Document* output); - /// Initialize "default_configs_" to show the default values for ImpalaQueryOptions and /// "support_start_over/false" to indicate that Impala does not support start over /// in the fetch call. void InitializeConfigVariables(); - /// Registers all the per-Impalad webserver callbacks - void RegisterWebserverCallbacks(Webserver* webserver); - /// Checks settings for profile logging, including whether the output /// directory exists and is writeable, and initialises the first log file. /// Returns OK unless there is some problem preventing profile log files @@ -629,11 +534,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) const; }; - /// Helper method to render a single QueryStateRecord as a Json object - /// Used by QueryStateUrlCallback(). - void QueryStateToJson(const ImpalaServer::QueryStateRecord& record, - rapidjson::Value* value, rapidjson::Document* document); - /// Beeswax private methods /// Helper functions to translate between Beeswax and Impala structs
