http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/query-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h deleted file mode 100644 index 8ac066e..0000000 --- a/be/src/service/query-exec-state.h +++ /dev/null @@ -1,408 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_SERVICE_QUERY_EXEC_STATE_H -#define IMPALA_SERVICE_QUERY_EXEC_STATE_H - -#include "common/status.h" -#include "exec/catalog-op-executor.h" -#include "util/runtime-profile.h" -#include "runtime/timestamp-value.h" -#include "service/child-query.h" -#include "scheduling/query-schedule.h" -#include "gen-cpp/Frontend_types.h" -#include "service/impala-server.h" -#include "gen-cpp/Frontend_types.h" -#include "util/auth-util.h" - -#include <boost/thread.hpp> -#include <boost/unordered_set.hpp> -#include <vector> - -namespace impala { - -class ExecEnv; -class Coordinator; -class RuntimeState; -class RowBatch; -class Expr; -class TupleRow; -class Frontend; -class QueryExecStateCleaner; - -/// Execution state of a query. This captures everything necessary -/// to convert row batches received by the coordinator into results -/// we can return to the client. It also captures all state required for -/// servicing query-related requests from the client. -/// Thread safety: this class is generally not thread-safe, callers need to -/// synchronize access explicitly via lock(). See the ImpalaServer class comment for -/// the required lock acquisition order. -/// -/// TODO: Consider renaming to RequestExecState for consistency. -/// TODO: Compute stats is the only stmt that requires child queries. Once the -/// CatalogService performs background stats gathering the concept of child queries -/// will likely become obsolete. Remove all child-query related code from this class. -class ImpalaServer::QueryExecState { - public: - QueryExecState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend, - ImpalaServer* server, std::shared_ptr<ImpalaServer::SessionState> session); - - ~QueryExecState(); - - /// Initiates execution of a exec_request. - /// Non-blocking. - /// Must *not* be called with lock_ held. - Status Exec(TExecRequest* exec_request); - - /// Execute a HiveServer2 metadata operation - /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these different - /// code paths. - Status Exec(const TMetadataOpRequest& exec_request); - - /// Call this to ensure that rows are ready when calling FetchRows(). Updates the - /// query_status_, and advances query_state_ to FINISHED or EXCEPTION. Must be preceded - /// by call to Exec(). Waits for all child queries to complete. Takes lock_. - void Wait(); - - /// Calls Wait() asynchronously in a thread and returns immediately. - void WaitAsync(); - - /// BlockOnWait() may be called after WaitAsync() has been called in order to wait - /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this - /// from multiple threads (all threads will block until wait_thread_ has completed) - /// and multiple times (non-blocking once wait_thread_ has completed). Do not call - /// while holding lock_. - void BlockOnWait(); - - /// Return at most max_rows from the current batch. If the entire current batch has - /// been returned, fetch another batch first. - /// Caller needs to hold fetch_rows_lock_ and lock_. - /// Caller should verify that EOS has not be reached before calling. - /// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()). - /// Also updates query_state_/status_ in case of error. - Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows); - - /// Resets the state of this query such that the next fetch() returns results from the - /// beginning of the query result set (by using the using result_cache_). - /// It is valid to call this function for any type of statement that returns a result - /// set, including queries, show stmts, compute stats, etc. - /// Returns a recoverable error status if the restart is not possible, ok() otherwise. - /// The error is recoverable to allow clients to resume fetching. - /// The caller must hold fetch_rows_lock_ and lock_. - Status RestartFetch(); - - /// Update query state if the requested state isn't already obsolete. This is only for - /// non-error states - if the query encounters an error the query status needs to be set - /// with information about the error so UpdateQueryStatus must be used instead. - /// Takes lock_. - void UpdateNonErrorQueryState(beeswax::QueryState::type query_state); - - /// Update the query status and the "Query Status" summary profile string. - /// If current status is already != ok, no update is made (we preserve the first error) - /// If called with a non-ok argument, the expectation is that the query will be aborted - /// quickly. - /// Returns the status argument (so we can write - /// RETURN_IF_ERROR(UpdateQueryStatus(SomeOperation())). - /// Does not take lock_, but requires it: caller must ensure lock_ - /// is taken before calling UpdateQueryStatus - Status UpdateQueryStatus(const Status& status); - - /// Cancels the child queries and the coordinator with the given cause. - /// If cause is NULL, assume this was deliberately cancelled by the user. - /// Otherwise, sets state to EXCEPTION. - /// Does nothing if the query has reached EOS or already cancelled. - /// - /// Only returns an error if 'check_inflight' is true and the query is not yet - /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't - /// in-flight (for cleaning up after an error on the query issuing path). - Status Cancel(bool check_inflight, const Status* cause); - - /// This is called when the query is done (finished, cancelled, or failed). - /// Takes lock_: callers must not hold lock() before calling. - void Done(); - - /// Sets the API-specific (Beeswax, HS2) result cache and its size bound. - /// The given cache is owned by this query exec state, even if an error is returned. - /// Returns a non-ok status if max_size exceeds the per-impalad allowed maximum. - Status SetResultCache(QueryResultSet* cache, int64_t max_size); - - ImpalaServer::SessionState* session() const { return session_.get(); } - - /// Queries are run and authorized on behalf of the effective_user. - const std::string& effective_user() const { - return GetEffectiveUser(query_ctx_.session); - } - const std::string& connected_user() const { return query_ctx_.session.connected_user; } - const std::string& do_as_user() const { return query_ctx_.session.delegated_user; } - TSessionType::type session_type() const { return query_ctx_.session.session_type; } - const TUniqueId& session_id() const { return query_ctx_.session.session_id; } - const std::string& default_db() const { return query_ctx_.session.database; } - bool eos() const { return eos_; } - Coordinator* coord() const { return coord_.get(); } - QuerySchedule* schedule() { return schedule_.get(); } - - /// Resource pool associated with this query, or an empty string if the schedule has not - /// been created and had the pool set yet, or this StmtType doesn't go through admission - /// control. - std::string request_pool() const { - return schedule_ == nullptr ? "" : schedule_->request_pool(); - } - int num_rows_fetched() const { return num_rows_fetched_; } - void set_fetched_rows() { fetched_rows_ = true; } - bool fetched_rows() const { return fetched_rows_; } - bool returns_result_set() { return !result_metadata_.columns.empty(); } - const TResultSetMetadata* result_metadata() { return &result_metadata_; } - const TUniqueId& query_id() const { return query_ctx_.query_id; } - const TExecRequest& exec_request() const { return exec_request_; } - TStmtType::type stmt_type() const { return exec_request_.stmt_type; } - TCatalogOpType::type catalog_op_type() const { - return exec_request_.catalog_op_request.op_type; - } - TDdlType::type ddl_type() const { - return exec_request_.catalog_op_request.ddl_params.ddl_type; - } - boost::mutex* lock() { return &lock_; } - boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; } - beeswax::QueryState::type query_state() const { return query_state_; } - const Status& query_status() const { return query_status_; } - void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; } - const RuntimeProfile& profile() const { return profile_; } - const RuntimeProfile& summary_profile() const { return summary_profile_; } - const TimestampValue& start_time() const { return start_time_; } - const TimestampValue& end_time() const { return end_time_; } - const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; } - const TQueryOptions& query_options() const { - return query_ctx_.client_request.query_options; - } - /// Returns 0:0 if this is a root query - TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; } - - const std::vector<std::string>& GetAnalysisWarnings() const { - return exec_request_.analysis_warnings; - } - - inline int64_t last_active_ms() const { - boost::lock_guard<boost::mutex> l(expiration_data_lock_); - return last_active_time_ms_; - } - - /// Returns true if Impala is actively processing this query. - inline bool is_active() const { - boost::lock_guard<boost::mutex> l(expiration_data_lock_); - return ref_count_ > 0; - } - - RuntimeProfile::EventSequence* query_events() const { return query_events_; } - RuntimeProfile* summary_profile() { return &summary_profile_; } - - private: - const TQueryCtx query_ctx_; - - /// Ensures single-threaded execution of FetchRows(). Callers of FetchRows() are - /// responsible for acquiring this lock. To avoid deadlocks, callers must not hold lock_ - /// while acquiring this lock (since FetchRows() will release and re-acquire lock_ during - /// its execution). - /// See "Locking" in the class comment for lock acquisition order. - boost::mutex fetch_rows_lock_; - - /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls - /// - no other locks should be acquired while holding this lock. - mutable boost::mutex expiration_data_lock_; - - /// Stores the last time that the query was actively doing work, in Unix milliseconds. - int64_t last_active_time_ms_; - - /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every - /// time a client instructs Impala to do work on behalf of this query, the ref count is - /// increased, and decreased once that work is completed. - uint32_t ref_count_; - - /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL. - const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_; - - /// Protects all following fields. Acquirers should be careful not to hold it for too - /// long, e.g. during RPCs because this lock is required to make progress on various - /// ImpalaServer requests. If held for too long it can block progress of client - /// requests for this query, e.g. query status and cancellation. Furthermore, until - /// IMPALA-3882 is fixed, it can indirectly block progress on all other queries. - /// See "Locking" in the class comment for lock acquisition order. - boost::mutex lock_; - - ExecEnv* exec_env_; - - /// Thread for asynchronously running Wait(). - boost::scoped_ptr<Thread> wait_thread_; - - /// Condition variable to make BlockOnWait() thread-safe. One thread joins - /// wait_thread_, and all other threads block on this cv. Used with lock_. - boost::condition_variable block_on_wait_cv_; - - /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe. - bool is_block_on_wait_joining_; - - /// Session that this query is from - std::shared_ptr<SessionState> session_; - - /// Resource assignment determined by scheduler. Owned by obj_pool_. - boost::scoped_ptr<QuerySchedule> schedule_; - - /// Not set for ddl queries. - boost::scoped_ptr<Coordinator> coord_; - - /// Runs statements that query or modify the catalog via the CatalogService. - boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_; - - /// Result set used for requests that return results and are not QUERY - /// statements. For example, EXPLAIN, LOAD, and SHOW use this. - boost::scoped_ptr<std::vector<TResultRow>> request_result_set_; - - /// Cache of the first result_cache_max_size_ query results to allow clients to restart - /// fetching from the beginning of the result set. This cache is appended to in - /// FetchInternal(), and set to NULL if its bound is exceeded. If the bound is exceeded, - /// then clients cannot restart fetching because some results have been lost since the - /// last fetch. Only set if result_cache_max_size_ > 0. - boost::scoped_ptr<QueryResultSet> result_cache_; - - /// Max size of the result_cache_ in number of rows. A value <= 0 means no caching. - int64_t result_cache_max_size_; - - ObjectPool profile_pool_; - - /// The QueryExecState builds three separate profiles. - /// * profile_ is the top-level profile which houses the other - /// profiles, plus the query timeline - /// * summary_profile_ contains mostly static information about the - /// query, including the query statement, the plan and the user who submitted it. - /// * server_profile_ tracks time spent inside the ImpalaServer, - /// but not inside fragment execution, i.e. the time taken to - /// register and set-up the query and for rows to be fetched. - // - /// There's a fourth profile which is not built here (but is a - /// child of profile_); the execution profile which tracks the - /// actual fragment execution. - RuntimeProfile profile_; - RuntimeProfile server_profile_; - RuntimeProfile summary_profile_; - RuntimeProfile::Counter* row_materialization_timer_; - - /// Tracks how long we are idle waiting for a client to fetch rows. - RuntimeProfile::Counter* client_wait_timer_; - /// Timer to track idle time for the above counter. - MonotonicStopWatch client_wait_sw_; - - RuntimeProfile::EventSequence* query_events_; - - bool is_cancelled_; // if true, Cancel() was called. - bool eos_; // if true, there are no more rows to return - // We enforce the invariant that query_status_ is not OK iff query_state_ - // is EXCEPTION, given that lock_ is held. - beeswax::QueryState::type query_state_; - Status query_status_; - TExecRequest exec_request_; - - TResultSetMetadata result_metadata_; // metadata for select query - RowBatch* current_batch_; // the current row batch; only applicable if coord is set - int current_batch_row_; // number of rows fetched within the current batch - int num_rows_fetched_; // number of rows fetched by client for the entire query - - /// True if a fetch was attempted by a client, regardless of whether a result set - /// (or error) was returned to the client. - bool fetched_rows_; - - /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned. - Frontend* frontend_; - - /// The parent ImpalaServer; called to wait until the the impalad has processed a - /// catalog update request. Not owned. - ImpalaServer* parent_server_; - - /// Start/end time of the query - TimestampValue start_time_, end_time_; - - /// Executes a local catalog operation (an operation that does not need to execute - /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements. - Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op); - - /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not - /// doing any work. Takes expiration_data_lock_. - void MarkInactive(); - - /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being - /// actively processed. Takes expiration_data_lock_. - void MarkActive(); - - /// Core logic of initiating a query or dml execution request. - /// Initiates execution of plan fragments, if there are any, and sets - /// up the output exprs for subsequent calls to FetchRows(). - /// 'coord_' is only valid after this method is called, and may be invalid if it - /// returns an error. - /// Also sets up profile and pre-execution counters. - /// Non-blocking. - Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request); - - /// Core logic of executing a ddl statement. May internally initiate execution of - /// queries (e.g., compute stats) or dml (e.g., create table as select) - Status ExecDdlRequest(); - - /// Executes a LOAD DATA - Status ExecLoadDataRequest(); - - /// Core logic of Wait(). Does not update query_state_/status_. - Status WaitInternal(); - - /// Core logic of FetchRows(). Does not update query_state_/status_. - /// Caller needs to hold fetch_rows_lock_ and lock_. - Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows); - - /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in - /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'. - /// result and scales must have been resized to the number of columns before call. - Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales); - - /// Gather and publish all required updates to the metastore - Status UpdateCatalog(); - - /// Copies results into request_result_set_ - /// TODO: Have the FE return list<Data.TResultRow> so that this isn't necessary - void SetResultSet(const TDdlExecResponse* ddl_resp); - void SetResultSet(const std::vector<std::string>& results); - void SetResultSet(const std::vector<std::string>& col1, - const std::vector<std::string>& col2); - void SetResultSet(const std::vector<std::string>& col1, - const std::vector<std::string>& col2, const std::vector<std::string>& col3, - const std::vector<std::string>& col4); - - /// Sets the result set for a CREATE TABLE AS SELECT statement. The results will not be - /// ready until all BEs complete execution. This can be called as part of Wait(), - /// at which point results will be avilable. - void SetCreateTableAsSelectResultSet(); - - /// Updates the metastore's table and column statistics based on the child-query results - /// of a compute stats command. - /// TODO: Unify the various ways that the Metastore is updated for DDL/DML. - /// For example, INSERT queries update partition metadata in UpdateCatalog() using a - /// TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar - /// purposes. Perhaps INSERT should use a TCatalogOpRequest as well. - Status UpdateTableAndColumnStats(const std::vector<ChildQuery*>& child_queries); - - /// Sets result_cache_ to NULL and updates its associated metrics and mem consumption. - /// This function is a no-op if the cache has already been cleared. - void ClearResultCache(); -}; - -} -#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/testutil/desc-tbl-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc index 77be724..b4b0a1f 100644 --- a/be/src/testutil/desc-tbl-builder.cc +++ b/be/src/testutil/desc-tbl-builder.cc @@ -61,7 +61,7 @@ DescriptorTbl* DescriptorTblBuilder::Build() { DCHECK(buildDescTblStatus.ok()) << buildDescTblStatus.GetDetail(); DescriptorTbl* desc_tbl; - Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, &desc_tbl); + Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, nullptr, &desc_tbl); DCHECK(status.ok()) << status.GetDetail(); return desc_tbl; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/testutil/fault-injection-util.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h index c99001c..99816a4 100644 --- a/be/src/testutil/fault-injection-util.h +++ b/be/src/testutil/fault-injection-util.h @@ -30,8 +30,8 @@ namespace impala { #ifndef NDEBUG enum RpcCallType { RPC_NULL = 0, - RPC_EXECPLANFRAGMENT, - RPC_CANCELPLANFRAGMENT, + RPC_EXECQUERYFINSTANCES, + RPC_CANCELQUERYFINSTANCES, RPC_PUBLISHFILTER, RPC_UPDATEFILTER, RPC_TRANSMITDATA, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/error-util-test.cc b/be/src/util/error-util-test.cc index 2390630..0c940cb 100644 --- a/be/src/util/error-util-test.cc +++ b/be/src/util/error-util-test.cc @@ -47,7 +47,7 @@ TEST(ErrorMsg, MergeMap) { right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p"); right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3; - MergeErrorMaps(&left, right); + MergeErrorMaps(right, &left); ASSERT_EQ(2, left.size()); ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size()); @@ -55,7 +55,7 @@ TEST(ErrorMsg, MergeMap) { right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p"); right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3; - MergeErrorMaps(&left, right); + MergeErrorMaps(right, &left); ASSERT_EQ(2, left.size()); ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size()); ASSERT_EQ(6, left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count); @@ -74,7 +74,7 @@ TEST(ErrorMsg, MergeMap) { ASSERT_EQ(2, cleared.size()); ASSERT_EQ(1, cleared.count(TErrorCode::RPC_RECV_TIMEOUT)); - MergeErrorMaps(&dummy, cleared); + MergeErrorMaps(cleared, &dummy); ASSERT_EQ(3, dummy.size()); ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count); ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size()); @@ -84,7 +84,7 @@ TEST(ErrorMsg, MergeMap) { ASSERT_EQ(0, dummy[TErrorCode::GENERAL].count); ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages.size()); - MergeErrorMaps(&cleared, dummy); + MergeErrorMaps(dummy, &cleared); ASSERT_EQ(3, cleared.size()); ASSERT_EQ(3, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count); ASSERT_EQ(1, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc index db9aef6..69a0355 100644 --- a/be/src/util/error-util.cc +++ b/be/src/util/error-util.cc @@ -159,9 +159,9 @@ string PrintErrorMapToString(const ErrorLogMap& errors) { return stream.str(); } -void MergeErrorMaps(ErrorLogMap* left, const ErrorLogMap& right) { - for (const ErrorLogMap::value_type& v: right) { - TErrorLogEntry& target = (*left)[v.first]; +void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2) { + for (const ErrorLogMap::value_type& v: m1) { + TErrorLogEntry& target = (*m2)[v.first]; const TErrorLogEntry& source = v.second; // Append generic message, append specific codes or increment count if exists if (v.first == TErrorCode::GENERAL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/error-util.h b/be/src/util/error-util.h index f245c2c..c366ab2 100644 --- a/be/src/util/error-util.h +++ b/be/src/util/error-util.h @@ -139,11 +139,11 @@ private: /// Track log messages per error code. typedef std::map<TErrorCode::type, TErrorLogEntry> ErrorLogMap; -/// Merge error maps. Merging of error maps occurs, when the errors from multiple backends -/// are merged into a single error map. General log messages are simply appended, -/// specific errors are deduplicated by either appending a new instance or incrementing -/// the count of an existing one. -void MergeErrorMaps(ErrorLogMap* left, const ErrorLogMap& right); +/// Merge error map m1 into m2. Merging of error maps occurs when the errors from +/// multiple backends are merged into a single error map. General log messages are +/// simply appended, specific errors are deduplicated by either appending a new +/// instance or incrementing the count of an existing one. +void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2); /// Append an error to the error map. Performs the aggregation as follows: GENERAL errors /// are appended to the list of GENERAL errors, to keep one item each in the map, while http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/uid-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h index 37f804a..1f57298 100644 --- a/be/src/util/uid-util.h +++ b/be/src/util/uid-util.h @@ -78,6 +78,10 @@ inline int32_t GetInstanceIdx(const TUniqueId& fragment_instance_id) { return fragment_instance_id.lo & FRAGMENT_IDX_MASK; } +inline bool IsValidFInstanceId(const TUniqueId& fragment_instance_id) { + return fragment_instance_id.hi != 0L; +} + inline TUniqueId CreateInstanceId( const TUniqueId& query_id, int32_t instance_idx) { DCHECK_EQ(GetInstanceIdx(query_id), 0); // well-formed query id http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/common/thrift/ExecStats.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift index 8e88b20..68a8fd8 100644 --- a/common/thrift/ExecStats.thrift +++ b/common/thrift/ExecStats.thrift @@ -43,6 +43,7 @@ struct TExecStats { // Total CPU time spent across all threads. For operators that have an async // component (e.g. multi-threaded) this will be >= latency_ns. + // TODO-MT: remove this or latency_ns 2: optional i64 cpu_time_ns // Number of rows returned. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 433f3b4..b17aeec 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -280,8 +280,17 @@ struct TClientRequest { 3: optional string redacted_stmt } +// Debug options: perform some action in a particular phase of a particular node +// TODO: find a better name +struct TDebugOptions { + 1: optional Types.TPlanNodeId node_id + 2: optional PlanNodes.TExecNodePhase phase + 3: optional PlanNodes.TDebugAction action +} + // Context of this query, including the client request, session state and // global query parameters needed for consistent expr evaluation (e.g., now()). +// // TODO: Separate into FE/BE initialized vars. struct TQueryCtx { // Client request containing stmt to execute and query options. @@ -326,8 +335,7 @@ struct TQueryCtx { // This defaults to -1 when no timestamp is specified. 11: optional i64 snapshot_timestamp = -1; - // Contains only the union of those descriptors referenced by list of fragments destined - // for a single host. Optional for frontend tests. + // Optional for frontend tests. 12: optional Descriptors.TDescriptorTable desc_tbl // Milliseconds since UNIX epoch at the start of query execution. @@ -340,12 +348,31 @@ struct TQueryCtx { // List of tables with scan ranges that map to blocks with missing disk IDs. 15: optional list<CatalogObjects.TTableName> tables_missing_diskids + + // The pool to which this request has been submitted. Used to update pool statistics + // for admission control. + 16: optional string request_pool +} + +// Specification of one output destination of a plan fragment +struct TPlanFragmentDestination { + // the globally unique fragment instance id + 1: required Types.TUniqueId fragment_instance_id + + // ... which is being executed on this server + 2: required Types.TNetworkAddress server } // Context to collect information, which is shared among all instances of that plan // fragment. struct TPlanFragmentCtx { 1: required Planner.TPlanFragment fragment + + // Output destinations, one per output partition. + // The partitioning of the output is specified by + // TPlanFragment.output_sink.output_partition. + // The number of output partitions is destinations.size(). + 2: list<TPlanFragmentDestination> destinations } // A scan range plus the parameters needed to execute that scan. @@ -356,56 +383,37 @@ struct TScanRangeParams { 4: optional bool is_remote } -// Specification of one output destination of a plan fragment -struct TPlanFragmentDestination { - // the globally unique fragment instance id - 1: required Types.TUniqueId fragment_instance_id - - // ... which is being executed on this server - 2: required Types.TNetworkAddress server -} - -// Execution parameters of a fragment instance, including its unique id, the total number -// of fragment instances, the query context, the coordinator address, etc. -// TODO: for range partitioning, we also need to specify the range boundaries +// Execution parameters of a single fragment instance. struct TPlanFragmentInstanceCtx { + // TPlanFragment.idx + 1: required Types.TFragmentIdx fragment_idx + // The globally unique fragment instance id. // Format: query id + query-wide fragment instance index - // The query-wide fragment instance index starts at 0, so that the query id - // and the id of the first fragment instance are identical. + // The query-wide fragment instance index enumerates all fragment instances of a + // particular query. It starts at 0, so that the query id and the id of the first + // fragment instance are identical. // If there is a coordinator instance, it is the first one, with index 0. - 1: required Types.TUniqueId fragment_instance_id + // Range: [0, TExecQueryFInstancesParams.fragment_instance_ctxs.size()-1] + 2: required Types.TUniqueId fragment_instance_id - // Index of this fragment instance accross all instances of its parent fragment, - // range [0, TPlanFragmentCtx.num_fragment_instances). - 2: required i32 per_fragment_instance_idx + // Index of this fragment instance across all instances of its parent fragment + // (TPlanFragment with idx = TPlanFragmentInstanceCtx.fragment_idx). + // Range: [0, <# of instances of parent fragment> - 1] + 3: required i32 per_fragment_instance_idx // Initial scan ranges for each scan node in TPlanFragment.plan_tree - 3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges + 4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree; // needed to create a DataStreamRecvr // TODO for per-query exec rpc: move these to TPlanFragmentCtx - 4: required map<Types.TPlanNodeId, i32> per_exch_num_senders + 5: required map<Types.TPlanNodeId, i32> per_exch_num_senders - // Output destinations, one per output partition. - // The partitioning of the output is specified by - // TPlanFragment.output_sink.output_partition. - // The number of output partitions is destinations.size(). - // TODO for per-query exec rpc: move these to TPlanFragmentCtx - 5: list<TPlanFragmentDestination> destinations - - // Debug options: perform some action in a particular phase of a particular node - 6: optional Types.TPlanNodeId debug_node_id - 7: optional PlanNodes.TExecNodePhase debug_phase - 8: optional PlanNodes.TDebugAction debug_action - - // The pool to which this request has been submitted. Used to update pool statistics - // for admission control. - 9: optional string request_pool + // Id of this instance in its role as a sender. + 6: optional i32 sender_id - // Id of this fragment in its role as a sender. - 10: optional i32 sender_id + 7: optional TDebugOptions debug_options } @@ -415,29 +423,37 @@ enum ImpalaInternalServiceVersion { V1 } +// The following contains the per-rpc structs for the parameters and the result. -// ExecPlanFragment +// ExecQueryFInstances -struct TExecPlanFragmentParams { +struct TExecQueryFInstancesParams { 1: required ImpalaInternalServiceVersion protocol_version - // Context of the query, which this fragment is part of. - 2: optional TQueryCtx query_ctx + // this backend's index into Coordinator::backend_states_, + // needed for subsequent rpcs to the coordinator + // required in V1 + 2: optional i32 coord_state_idx - // Context of this fragment. - 3: optional TPlanFragmentCtx fragment_ctx + // required in V1 + 3: optional TQueryCtx query_ctx - // Context of this fragment instance, including its instance id, the total number - // fragment instances, the query context, etc. - 4: optional TPlanFragmentInstanceCtx fragment_instance_ctx + // required in V1 + 4: list<TPlanFragmentCtx> fragment_ctxs + + // the order corresponds to the order of fragments in fragment_ctxs + // required in V1 + 5: list<TPlanFragmentInstanceCtx> fragment_instance_ctxs } -struct TExecPlanFragmentResult { +struct TExecQueryFInstancesResult { // required in V1 1: optional Status.TStatus status } + // ReportExecStatus + struct TParquetInsertStats { // For each column, the on disk byte size 1: required map<string, i64> per_column_size @@ -509,33 +525,43 @@ struct TErrorLogEntry { 2: list<string> messages } -struct TReportExecStatusParams { - 1: required ImpalaInternalServiceVersion protocol_version - - // required in V1 - 2: optional Types.TUniqueId query_id - +struct TFragmentInstanceExecStatus { // required in V1 - 3: optional Types.TUniqueId fragment_instance_id + 1: optional Types.TUniqueId fragment_instance_id // Status of fragment execution; any error status means it's done. // required in V1 - 4: optional Status.TStatus status + 2: optional Status.TStatus status // If true, fragment finished executing. // required in V1 - 5: optional bool done + 3: optional bool done // cumulative profile // required in V1 - 6: optional RuntimeProfile.TRuntimeProfileTree profile + 4: optional RuntimeProfile.TRuntimeProfileTree profile +} + +struct TReportExecStatusParams { + 1: required ImpalaInternalServiceVersion protocol_version - // Cumulative structural changes made by a table sink + // required in V1 + 2: optional Types.TUniqueId query_id + + // same as TExecQueryFInstancesParams.coord_state_idx + // required in V1 + 3: optional i32 coord_state_idx + + 4: list<TFragmentInstanceExecStatus> instance_exec_status + + // Cumulative structural changes made by the table sink of any instance + // included in instance_exec_status // optional in V1 - 7: optional TInsertExecStatus insert_exec_status; + 5: optional TInsertExecStatus insert_exec_status; - // New errors that have not been reported to the coordinator - 8: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log; + // New errors that have not been reported to the coordinator by any of the + // instances included in instance_exec_status + 6: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log; } struct TReportExecStatusResult { @@ -544,16 +570,16 @@ struct TReportExecStatusResult { } -// CancelPlanFragment +// CancelQueryFInstances -struct TCancelPlanFragmentParams { +struct TCancelQueryFInstancesParams { 1: required ImpalaInternalServiceVersion protocol_version // required in V1 - 2: optional Types.TUniqueId fragment_instance_id + 2: optional Types.TUniqueId query_id } -struct TCancelPlanFragmentResult { +struct TCancelQueryFInstancesResult { // required in V1 1: optional Status.TStatus status } @@ -573,7 +599,7 @@ struct TTransmitDataParams { // required in V1 4: optional Types.TPlanNodeId dest_node_id - // required in V1 + // optional in V1 5: optional Results.TRowBatch row_batch // if set to true, indicates that no more row batches will be sent @@ -587,6 +613,7 @@ struct TTransmitDataResult { } // Parameters for RequestPoolService.resolveRequestPool() +// TODO: why is this here? struct TResolveRequestPoolParams { // User to resolve to a pool via the allocation placement policy and // authorize for pool access. @@ -611,6 +638,7 @@ struct TResolveRequestPoolResult { } // Parameters for RequestPoolService.getPoolConfig() +// TODO: why is this here? struct TPoolConfigParams { // Pool name 1: required string pool @@ -655,48 +683,68 @@ struct TBloomFilter { 4: required bool always_true } -struct TUpdateFilterResult { -} +// UpdateFilter struct TUpdateFilterParams { + 1: required ImpalaInternalServiceVersion protocol_version + // Filter ID, unique within a query. - 1: required i32 filter_id + // required in V1 + 2: optional i32 filter_id // Query that this filter is for. - 2: required Types.TUniqueId query_id + // required in V1 + 3: optional Types.TUniqueId query_id - 3: required TBloomFilter bloom_filter + // required in V1 + 4: optional TBloomFilter bloom_filter } -struct TPublishFilterResult { - +struct TUpdateFilterResult { } + +// PublishFilter + struct TPublishFilterParams { + 1: required ImpalaInternalServiceVersion protocol_version + // Filter ID to update - 1: required i32 filter_id + // required in V1 + 2: optional i32 filter_id - // ID of fragment to receive this filter - 2: required Types.TUniqueId dst_instance_id + // required in V1 + 3: optional Types.TUniqueId dst_query_id + + // Index of fragment to receive this filter + // required in V1 + 4: optional Types.TFragmentIdx dst_fragment_idx // Actual bloom_filter payload - 3: required TBloomFilter bloom_filter + // required in V1 + 5: optional TBloomFilter bloom_filter } +struct TPublishFilterResult { +} + + service ImpalaInternalService { - // Called by coord to start asynchronous execution of plan fragment in backend. + // Called by coord to start asynchronous execution of a query's fragment instances in + // backend. // Returns as soon as all incoming data streams have been set up. - TExecPlanFragmentResult ExecPlanFragment(1:TExecPlanFragmentParams params); + TExecQueryFInstancesResult ExecQueryFInstances(1:TExecQueryFInstancesParams params); - // Periodically called by backend to report status of plan fragment execution + // Periodically called by backend to report status of fragment instance execution // back to coord; also called when execution is finished, for whatever reason. TReportExecStatusResult ReportExecStatus(1:TReportExecStatusParams params); - // Called by coord to cancel execution of a single plan fragment, which this - // coordinator initiated with a prior call to ExecPlanFragment. + // Called by coord to cancel execution of a single query's fragment instances, which + // the coordinator initiated with a prior call to ExecQueryFInstances. // Cancellation is asynchronous. - TCancelPlanFragmentResult CancelPlanFragment(1:TCancelPlanFragmentParams params); + TCancelQueryFInstancesResult CancelQueryFInstances( + 1:TCancelQueryFInstancesParams params); // Called by sender to transmit single row batch. Returns error indication // if params.fragmentId or params.destNodeId are unknown or if data couldn't be read. @@ -706,7 +754,7 @@ service ImpalaInternalService { // the coordinator for aggregation and broadcast. TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params); - // Called by the coordinator to deliver global runtime filters to fragment instances for + // Called by the coordinator to deliver global runtime filters to fragments for // application at plan nodes. TPublishFilterResult PublishFilter(1:TPublishFilterParams params); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/tests/common/test_result_verifier.py ---------------------------------------------------------------------- diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py index a816eaf..7e929f1 100644 --- a/tests/common/test_result_verifier.py +++ b/tests/common/test_result_verifier.py @@ -490,7 +490,6 @@ def compute_aggregation(function, field, runtime_profile): field_regex_re = re.compile(field_regex) inside_avg_fragment = False avg_fragment_indent = None - past_avg_fragment = False match_list = [] for line in runtime_profile.splitlines(): # Detect the boundaries of the averaged fragment by looking at indentation. @@ -498,18 +497,17 @@ def compute_aggregation(function, field, runtime_profile): # its children are at a greater indent. When the indentation gets back to # the level of the the averaged fragment start, then the averaged fragment # is done. + if start_avg_fragment_re.match(line): + inside_avg_fragment = True + avg_fragment_indent = len(line) - len(line.lstrip()) + continue + if inside_avg_fragment: indentation = len(line) - len(line.lstrip()) if indentation > avg_fragment_indent: continue else: inside_avg_fragment = False - past_avg_fragment = True - - if not past_avg_fragment and start_avg_fragment_re.match(line): - inside_avg_fragment = True - avg_fragment_indent = len(line) - len(line.lstrip()) - continue if (field_regex_re.search(line)): match_list.extend(re.findall(field_regex, line))
