http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 8c423c0..c970163 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "runtime/runtime-state.h" + #include <iostream> #include <jni.h> #include <sstream> @@ -27,11 +29,11 @@ #include "exprs/expr.h" #include "runtime/buffered-block-mgr.h" #include "runtime/descriptors.h" -#include "runtime/runtime-state.h" -#include "runtime/timestamp-value.h" #include "runtime/data-stream-mgr.h" #include "runtime/data-stream-recvr.h" -#include "runtime/runtime-filter.h" +#include "runtime/mem-tracker.h" +#include "runtime/runtime-filter-bank.h" +#include "runtime/timestamp-value.h" #include "util/bitmap.h" #include "util/cpu-info.h" #include "util/debug-util.h" @@ -75,7 +77,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params, is_cancelled_(false), query_resource_mgr_(NULL), root_node_id_(-1), - filter_bank_(fragment_ctx().query_ctx, this) { + filter_bank_(new RuntimeFilterBank(fragment_ctx().query_ctx, this)) { Status status = Init(exec_env); DCHECK(status.ok()) << status.GetDetail(); } @@ -90,7 +92,7 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx) is_cancelled_(false), query_resource_mgr_(NULL), root_node_id_(-1), - filter_bank_(query_ctx, this) { + filter_bank_(new RuntimeFilterBank(query_ctx, this)) { fragment_params_.fragment_instance_ctx.__set_query_ctx(query_ctx); fragment_params_.fragment_instance_ctx.query_ctx.request.query_options .__set_batch_size(DEFAULT_BATCH_SIZE);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 860692f..15c8d9c 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -16,40 +16,34 @@ #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H #define IMPALA_RUNTIME_RUNTIME_STATE_H -/// needed for scoped_ptr to work on ObjectPool -#include "common/object-pool.h" - #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <vector> #include <string> -/// stringstream is a typedef, so can't forward declare it. -#include <sstream> -#include "scheduling/query-resource-mgr.h" +// NOTE: try not to add more headers here: runtime-state.h is included in many many files. +#include "common/global-types.h" // for PlanNodeId +#include "runtime/client-cache-types.h" #include "runtime/exec-env.h" -#include "runtime/descriptors.h" // for PlanNodeId -#include "runtime/disk-io-mgr.h" // for DiskIoMgr::RequestContext -#include "runtime/mem-tracker.h" -#include "runtime/runtime-filter.h" #include "runtime/thread-resource-mgr.h" -#include "gen-cpp/PlanNodes_types.h" -#include "gen-cpp/Types_types.h" // for TUniqueId -#include "gen-cpp/ImpalaInternalService_types.h" // for TQueryOptions -#include "util/auth-util.h" +#include "util/auth-util.h" // for GetEffectiveUser() #include "util/runtime-profile.h" namespace impala { class BufferedBlockMgr; +class DataStreamRecvr; class DescriptorTbl; -class ObjectPool; -class Status; -class ExecEnv; +class DiskIoRequestContext; class Expr; class LlvmCodeGen; +class MemTracker; +class ObjectPool; +class RuntimeFilterBank; +class Status; class TimestampValue; -class DataStreamRecvr; +class TQueryOptions; +class TUniqueId; /// Counts how many rows an INSERT query has added to a particular partition /// (partitions are identified by their partition keys: k1=v1/k2=v2 @@ -140,7 +134,7 @@ class RuntimeState { ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; } FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; } - std::vector<DiskIoMgr::RequestContext*>* reader_contexts() { return &reader_contexts_; } + std::vector<DiskIoRequestContext*>* reader_contexts() { return &reader_contexts_; } void set_fragment_root_id(PlanNodeId id) { DCHECK_EQ(root_node_id_, -1) << "Should not set this twice."; @@ -151,7 +145,7 @@ class RuntimeState { /// See comment on root_node_id_. We add one to prevent having a hash seed of 0. uint32_t fragment_hash_seed() const { return root_node_id_ + 1; } - RuntimeFilterBank* filter_bank() { return &filter_bank_; } + RuntimeFilterBank* filter_bank() { return filter_bank_.get(); } PartitionStatusMap* per_partition_status() { return &per_partition_status_; } @@ -348,7 +342,7 @@ class RuntimeState { QueryResourceMgr* query_resource_mgr_; /// Reader contexts that need to be closed when the fragment is closed. - std::vector<DiskIoMgr::RequestContext*> reader_contexts_; + std::vector<DiskIoRequestContext*> reader_contexts_; /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory /// with a fixed memory budget. @@ -366,7 +360,7 @@ class RuntimeState { /// Manages runtime filters that are either produced or consumed (or both!) by plan /// nodes that share this runtime state. - RuntimeFilterBank filter_bank_; + boost::scoped_ptr<RuntimeFilterBank> filter_bank_; /// prohibit copies RuntimeState(const RuntimeState&); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorted-run-merger.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc index 0376437..328d1c2 100644 --- a/be/src/runtime/sorted-run-merger.cc +++ b/be/src/runtime/sorted-run-merger.cc @@ -18,7 +18,7 @@ #include "runtime/row-batch.h" #include "runtime/sorter.h" #include "runtime/tuple-row.h" -#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index e0d388a..299dd2f 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -22,7 +22,7 @@ #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/sorted-run-merger.h" -#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/string-buffer.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h index c8f79df..ec59806 100644 --- a/be/src/runtime/string-buffer.h +++ b/be/src/runtime/string-buffer.h @@ -18,6 +18,7 @@ #include "common/status.h" #include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" #include "runtime/string-value.h" using namespace strings; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/thread-resource-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h index cb2c13e..00aaeba 100644 --- a/be/src/runtime/thread-resource-mgr.h +++ b/be/src/runtime/thread-resource-mgr.h @@ -18,10 +18,7 @@ #include <stdlib.h> #include <boost/function.hpp> -#include <boost/scoped_ptr.hpp> -#include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> -#include <boost/thread/thread.hpp> #include <list> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/tuple.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc index fe85948..93e63f3 100644 --- a/be/src/runtime/tuple.cc +++ b/be/src/runtime/tuple.cc @@ -29,6 +29,7 @@ #include "runtime/string-value.h" #include "runtime/tuple-row.h" #include "util/debug-util.h" +#include "util/runtime-profile-counters.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/admission-controller.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 70174a1..94e21b8 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -25,7 +25,7 @@ #include "runtime/mem-tracker.h" #include "util/debug-util.h" #include "util/time.h" -#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include "util/pretty-printer.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/query-resource-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc index 5687269..bf42329 100644 --- a/be/src/scheduling/query-resource-mgr.cc +++ b/be/src/scheduling/query-resource-mgr.cc @@ -22,6 +22,7 @@ #include "runtime/exec-env.h" #include "resourcebroker/resource-broker.h" #include "util/bit-util.h" +#include "util/cgroups-mgr.h" #include "util/container-util.h" #include "util/network-util.h" #include "util/promise.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index 7abc916..a14509c 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -24,6 +24,7 @@ #include "common/logging.h" #include "util/metrics.h" +#include "resourcebroker/resource-broker.h" #include "runtime/exec-env.h" #include "runtime/coordinator.h" #include "service/impala-server.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc index 680e295..38866f1 100644 --- a/be/src/service/fragment-exec-state.cc +++ b/be/src/service/fragment-exec-state.cc @@ -20,6 +20,7 @@ #include "gen-cpp/ImpalaInternalService.h" #include "rpc/thrift-util.h" #include "gutil/strings/substitute.h" +#include "runtime/runtime-filter-bank.h" #include "util/bloom-filter.h" #include "runtime/backend-client.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc index 5210f8c..f64ca7e 100644 --- a/be/src/service/fragment-mgr.cc +++ b/be/src/service/fragment-mgr.cc @@ -20,6 +20,7 @@ #include "service/fragment-exec-state.h" #include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" #include "util/impalad-metrics.h" #include "util/uid-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index 478c1ea..35cb945 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -41,6 +41,7 @@ #include "runtime/plan-fragment-executor.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" #include "runtime/timestamp-value.h" #include "scheduling/simple-scheduler.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 1dc4982..ce5df38 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -31,11 +31,12 @@ #include "common/logging.h" #include "common/version.h" #include "exprs/expr.h" +#include "rpc/thrift-util.h" #include "runtime/raw-value.h" #include "service/query-exec-state.h" #include "service/query-options.h" #include "util/debug-util.h" -#include "rpc/thrift-util.h" +#include "util/runtime-profile-counters.h" #include "util/impalad-metrics.h" #include "util/string-parser.h" #include "service/hs2-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index b534902..ec70c47 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -18,6 +18,8 @@ #include "exprs/expr.h" #include "exprs/expr-context.h" +#include "resourcebroker/resource-broker.h" +#include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "service/impala-server.h" @@ -25,6 +27,7 @@ #include "service/query-options.h" #include "util/debug-util.h" #include "util/impalad-metrics.h" +#include "util/runtime-profile-counters.h" #include "util/time.h" #include "gen-cpp/CatalogService.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/statestore/statestore-subscriber.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h index 3808459..ddf663e 100644 --- a/be/src/statestore/statestore-subscriber.h +++ b/be/src/statestore/statestore-subscriber.h @@ -21,23 +21,22 @@ #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> -#include <boost/thread/thread.hpp> #include "statestore/statestore.h" #include "util/stopwatch.h" #include "rpc/thrift-util.h" #include "rpc/thrift-client.h" -#include "util/thread.h" #include "util/metrics.h" #include "gen-cpp/StatestoreService.h" #include "gen-cpp/StatestoreSubscriber.h" namespace impala { -class TimeoutFailureDetector; class Status; -class TNetworkAddress; +class TimeoutFailureDetector; +class Thread; class ThriftServer; +class TNetworkAddress; typedef ClientCache<StatestoreServiceClient> StatestoreClientCache; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc index 358ae54..3b3994a 100644 --- a/be/src/testutil/desc-tbl-builder.cc +++ b/be/src/testutil/desc-tbl-builder.cc @@ -15,7 +15,7 @@ #include "testutil/desc-tbl-builder.h" #include "util/bit-util.h" - +#include "common/object-pool.h" #include "runtime/descriptors.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/desc-tbl-builder.h b/be/src/testutil/desc-tbl-builder.h index ec816ee..fdaf67e 100644 --- a/be/src/testutil/desc-tbl-builder.h +++ b/be/src/testutil/desc-tbl-builder.h @@ -16,6 +16,7 @@ #define IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_ #include "runtime/runtime-state.h" +#include "runtime/types.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/auth-util.cc b/be/src/util/auth-util.cc index e75a250..afe037c 100644 --- a/be/src/util/auth-util.cc +++ b/be/src/util/auth-util.cc @@ -14,6 +14,8 @@ #include "util/auth-util.h" +#include "gen-cpp/ImpalaInternalService_types.h" + using namespace std; namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/auth-util.h b/be/src/util/auth-util.h index e44bf70..d82c61a 100644 --- a/be/src/util/auth-util.h +++ b/be/src/util/auth-util.h @@ -18,16 +18,16 @@ #include <string> -#include "gen-cpp/ImpalaInternalService_types.h" - namespace impala { - /// Returns a reference to the "effective user" from the specified session. Queries - /// are run and authorized on behalf of the effective user. When a delegated_user is - /// specified (is not empty), the effective user is the delegated_user. This is because - /// the connected_user is acting as a "proxy user" for the delegated_user. When - /// delegated_user is empty, the effective user is the connected user. - const std::string& GetEffectiveUser(const TSessionState& session); +class TSessionState; + +/// Returns a reference to the "effective user" from the specified session. Queries +/// are run and authorized on behalf of the effective user. When a delegated_user is +/// specified (is not empty), the effective user is the delegated_user. This is because +/// the connected_user is acting as a "proxy user" for the delegated_user. When +/// delegated_user is empty, the effective user is the connected user. +const std::string& GetEffectiveUser(const TSessionState& session); } // namespace impala #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/avro-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/avro-util.cc b/be/src/util/avro-util.cc index b591f23..5cce8a3 100644 --- a/be/src/util/avro-util.cc +++ b/be/src/util/avro-util.cc @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include <avro/schema.h> - #include "util/avro-util.h" +#include <avro/schema.h> +#include <sstream> + using namespace std; namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/codec.h ---------------------------------------------------------------------- diff --git a/be/src/util/codec.h b/be/src/util/codec.h index afe3791..a337983 100644 --- a/be/src/util/codec.h +++ b/be/src/util/codec.h @@ -17,8 +17,6 @@ #define IMPALA_UTIL_CODEC_H #include "common/status.h" -#include "runtime/mem-pool.h" -#include "util/runtime-profile.h" #include <boost/scoped_ptr.hpp> #include "gen-cpp/Descriptors_types.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/decompress.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc index 588aff6..8a2ea74 100644 --- a/be/src/util/decompress.cc +++ b/be/src/util/decompress.cc @@ -15,6 +15,7 @@ #include <boost/assign/list_of.hpp> #include "util/decompress.h" #include "exec/read-write-util.h" +#include "runtime/mem-tracker.h" #include "runtime/runtime-state.h" #include "common/logging.h" #include "gen-cpp/Descriptors_types.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/dict-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h index 7a48886..795a6b1 100644 --- a/be/src/util/dict-encoding.h +++ b/be/src/util/dict-encoding.h @@ -24,7 +24,6 @@ #include "runtime/mem-pool.h" #include "runtime/string-value.h" #include "util/rle-encoding.h" -#include "util/runtime-profile.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops-defs.h ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-bulk-ops-defs.h b/be/src/util/hdfs-bulk-ops-defs.h new file mode 100644 index 0000000..13e1be0 --- /dev/null +++ b/be/src/util/hdfs-bulk-ops-defs.h @@ -0,0 +1,31 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H +#define IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H + +namespace impala { + +/// Forward declarations for HDFS ops. +template <typename T> +class ThreadPool; + +class HdfsOp; +class HdfsOperationSet; + +typedef ThreadPool<HdfsOp> HdfsOpThreadPool; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops.h ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h index e66f0d1..97feaa0 100644 --- a/be/src/util/hdfs-bulk-ops.h +++ b/be/src/util/hdfs-bulk-ops.h @@ -21,6 +21,7 @@ #include "common/hdfs.h" #include "common/atomic.h" #include "common/status.h" +#include "util/hdfs-bulk-ops-defs.h" #include "util/thread-pool.h" #include "util/counting-barrier.h" #include "runtime/hdfs-fs-cache.h" @@ -36,8 +37,6 @@ enum HdfsOpType { CHMOD }; -class HdfsOperationSet; - /// Container class that encapsulates a single HDFS operation. Used only internally by /// HdfsOperationSet, but visible because it parameterises HdfsOpThreadPool. class HdfsOp { @@ -81,8 +80,6 @@ class HdfsOp { void AddError(const string& error_msg) const; }; -typedef ThreadPool<HdfsOp> HdfsOpThreadPool; - /// Creates a new HdfsOp-processing thread pool. HdfsOpThreadPool* CreateHdfsOpThreadPool(const std::string& name, uint32_t num_threads, uint32_t max_queue_length); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/metrics.h ---------------------------------------------------------------------- diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index ab4b6c2..9ddaa48 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -24,8 +24,8 @@ #include <boost/thread/locks.hpp> #include "common/logging.h" -#include "common/status.h" #include "common/object-pool.h" +#include "common/status.h" #include "util/debug-util.h" #include "util/json-util.h" #include "util/pretty-printer.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/periodic-counter-updater.cc ---------------------------------------------------------------------- diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc index a3979d8..d1cbac5 100644 --- a/be/src/util/periodic-counter-updater.cc +++ b/be/src/util/periodic-counter-updater.cc @@ -14,6 +14,7 @@ #include "util/periodic-counter-updater.h" +#include "util/runtime-profile-counters.h" #include "util/time.h" #include "common/names.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-counters.h ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h new file mode 100644 index 0000000..4b51c4f --- /dev/null +++ b/be/src/util/runtime-profile-counters.h @@ -0,0 +1,488 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H +#define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H + +#include <boost/scoped_ptr.hpp> +#include <boost/unordered_map.hpp> +#include <sys/time.h> +#include <sys/resource.h> + +#include "common/atomic.h" +#include "common/logging.h" +#include "util/runtime-profile.h" +#include "util/stopwatch.h" +#include "util/streaming-sampler.h" + +namespace impala { + +/// Define macros for updating counters. The macros make it very easy to disable +/// all counters at compile time. Set this to 0 to remove counters. This is useful +/// to do to make sure the counters aren't affecting the system. +#define ENABLE_COUNTERS 1 + +/// Some macro magic to generate unique ids using __COUNTER__ +#define CONCAT_IMPL(x, y) x##y +#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) + +#if ENABLE_COUNTERS + #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit) + #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \ + (profile)->AddTimeSeriesCounter(name, src_counter) + #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS) + #define ADD_CHILD_TIMER(profile, name, parent) \ + (profile)->AddCounter(name, TUnit::TIME_NS, parent) + #define SCOPED_TIMER(c) \ + ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) + #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \ + ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled) + #define COUNTER_ADD(c, v) (c)->Add(v) + #define COUNTER_SET(c, v) (c)->Set(v) + #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix) + #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \ + ThreadCounterMeasurement \ + MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c) + #define SCOPED_CONCURRENT_COUNTER(c) \ + ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \ + MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c) +#else + #define ADD_COUNTER(profile, name, unit) NULL + #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL + #define ADD_TIMER(profile, name) NULL + #define ADD_CHILD_TIMER(profile, name, parent) NULL + #define SCOPED_TIMER(c) + #define CANCEL_SAFE_SCOPED_TIMER(c) + #define COUNTER_ADD(c, v) + #define COUNTER_SET(c, v) + #define ADD_THREAD_COUNTERS(profile, prefix) NULL + #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) + #define SCOPED_CONCURRENT_COUNTER(c) +#endif + +/// A counter that keeps track of the highest value seen (reporting that +/// as value()) and the current value. +class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter { + public: + HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {} + + virtual void Add(int64_t delta) { + int64_t new_val = current_value_.Add(delta); + UpdateMax(new_val); + } + + /// Tries to increase the current value by delta. If current_value() + delta + /// exceeds max, return false and current_value is not changed. + bool TryAdd(int64_t delta, int64_t max) { + while (true) { + int64_t old_val = current_value_.Load(); + int64_t new_val = old_val + delta; + if (UNLIKELY(new_val > max)) return false; + if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) { + UpdateMax(new_val); + return true; + } + } + } + + virtual void Set(int64_t v) { + current_value_.Store(v); + UpdateMax(v); + } + + int64_t current_value() const { return current_value_.Load(); } + + private: + /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is + /// atomic. + void UpdateMax(int64_t v) { + while (true) { + int64_t old_max = value_.Load(); + int64_t new_max = std::max(old_max, v); + if (new_max == old_max) break; // Avoid atomic update. + if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break; + } + } + + /// The current value of the counter. value_ in the super class represents + /// the high water mark. + AtomicInt64 current_value_; +}; + +/// A DerivedCounter also has a name and unit, but the value is computed. +/// Do not call Set() and Add(). +class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter { + public: + DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn) + : Counter(unit), + counter_fn_(counter_fn) {} + + virtual int64_t value() const { + return counter_fn_(); + } + + private: + DerivedCounterFunction counter_fn_; +}; + +/// An AveragedCounter maintains a set of counters and its value is the +/// average of the values in that set. The average is updated through calls +/// to UpdateCounter(), which may add a new counter or update an existing counter. +/// Set() and Add() should not be called. +class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter { + public: + AveragedCounter(TUnit::type unit) + : Counter(unit), + current_double_sum_(0.0), + current_int_sum_(0) { + } + + /// Update counter_value_map_ with the new counter. This may require the counter + /// to be added to the map. + /// No locks are obtained within this class because UpdateCounter() is called from + /// UpdateAverage(), which obtains locks on the entire counter map in a profile. + void UpdateCounter(Counter* new_counter) { + DCHECK_EQ(new_counter->unit_, unit_); + boost::unordered_map<Counter*, int64_t>::iterator it = + counter_value_map_.find(new_counter); + int64_t old_val = 0; + if (it != counter_value_map_.end()) { + old_val = it->second; + it->second = new_counter->value(); + } else { + counter_value_map_[new_counter] = new_counter->value(); + } + + if (unit_ == TUnit::DOUBLE_VALUE) { + double old_double_val = *reinterpret_cast<double*>(&old_val); + current_double_sum_ += (new_counter->double_value() - old_double_val); + double result_val = current_double_sum_ / (double) counter_value_map_.size(); + value_.Store(*reinterpret_cast<int64_t*>(&result_val)); + } else { + current_int_sum_ += (new_counter->value() - old_val); + value_.Store(current_int_sum_ / counter_value_map_.size()); + } + } + + /// The value for this counter should be updated through UpdateCounter(). + /// Set() and Add() should not be used. + virtual void Set(double value) { + DCHECK(false); + } + + virtual void Set(int64_t value) { + DCHECK(false); + } + + virtual void Add(int64_t delta) { + DCHECK(false); + } + + private: + /// Map from counters to their existing values. Modified via UpdateCounter(). + boost::unordered_map<Counter*, int64_t> counter_value_map_; + + /// Current sums of values from counter_value_map_. Only one of these is used, + /// depending on the unit of the counter. current_double_sum_ is used for + /// DOUBLE_VALUE, current_int_sum_ otherwise. + double current_double_sum_; + int64_t current_int_sum_; +}; + +/// A set of counters that measure thread info, such as total time, user time, sys time. +class RuntimeProfile::ThreadCounters { + private: + friend class ThreadCounterMeasurement; + friend class RuntimeProfile; + + Counter* total_time_; // total wall clock time + Counter* user_time_; // user CPU time + Counter* sys_time_; // system CPU time + + /// The number of times a context switch resulted due to a process voluntarily giving + /// up the processor before its time slice was completed. + Counter* voluntary_context_switches_; + + /// The number of times a context switch resulted due to a higher priority process + /// becoming runnable or because the current process exceeded its time slice. + Counter* involuntary_context_switches_; +}; + +/// An EventSequence captures a sequence of events (each added by +/// calling MarkEvent). Each event has a text label, and a time +/// (measured relative to the moment Start() was called as t=0). It is +/// useful for tracking the evolution of some serial process, such as +/// the query lifecycle. +class RuntimeProfile::EventSequence { + public: + EventSequence() { } + + /// Helper constructor for building from Thrift + EventSequence(const std::vector<int64_t>& timestamps, + const std::vector<std::string>& labels) { + DCHECK(timestamps.size() == labels.size()); + for (int i = 0; i < timestamps.size(); ++i) { + events_.push_back(make_pair(labels[i], timestamps[i])); + } + } + + /// Starts the timer without resetting it. + void Start() { sw_.Start(); } + + /// Stores an event in sequence with the given label and the current time + /// (relative to the first time Start() was called) as the timestamp. + void MarkEvent(const std::string& label) { + Event event = make_pair(label, sw_.ElapsedTime()); + boost::lock_guard<SpinLock> event_lock(lock_); + events_.push_back(event); + } + + int64_t ElapsedTime() { return sw_.ElapsedTime(); } + + /// An Event is a <label, timestamp> pair. + typedef std::pair<std::string, int64_t> Event; + + /// An EventList is a sequence of Events, in increasing timestamp order. + typedef std::vector<Event> EventList; + + /// Copies the member events_ into the supplied vector 'events'. + /// The supplied vector 'events' is cleared before this. + void GetEvents(std::vector<Event>* events) { + events->clear(); + boost::lock_guard<SpinLock> event_lock(lock_); + events->insert(events->end(), events_.begin(), events_.end()); + } + + void ToThrift(TEventSequence* seq) const; + + private: + /// Protect access to events_. + SpinLock lock_; + + /// Stored in increasing time order. + EventList events_; + + /// Timer which allows events to be timestamped when they are recorded. + MonotonicStopWatch sw_; +}; + +typedef StreamingSampler<int64_t, 64> StreamingCounterSampler; +class RuntimeProfile::TimeSeriesCounter { + public: + std::string DebugString() const; + + void AddSample(int ms_elapsed) { + int64_t sample = sample_fn_(); + samples_.AddSample(sample, ms_elapsed); + } + + private: + friend class RuntimeProfile; + + TimeSeriesCounter(const std::string& name, TUnit::type unit, + DerivedCounterFunction fn) + : name_(name), unit_(unit), sample_fn_(fn) { + } + + /// Construct a time series object from existing sample data. This counter + /// is then read-only (i.e. there is no sample function). + TimeSeriesCounter(const std::string& name, TUnit::type unit, int period, + const std::vector<int64_t>& values) + : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) { + } + + void ToThrift(TTimeSeriesCounter* counter); + + std::string name_; + TUnit::type unit_; + DerivedCounterFunction sample_fn_; + StreamingCounterSampler samples_; +}; + +/// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads +/// concurrent running time. +class RuntimeProfile::ConcurrentTimerCounter : public Counter { + public: + ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {} + + virtual int64_t value() const { return csw_.TotalRunningTime(); } + + void Start() { csw_.Start(); } + + void Stop() { csw_.Stop(); } + + /// Returns lap time for caller who wants delta update of concurrent running time. + uint64_t LapTime() { return csw_.LapTime(); } + + /// The value for this counter should come from internal ConcurrentStopWatch. + /// Set() and Add() should not be used. + virtual void Set(double value) { + DCHECK(false); + } + + virtual void Set(int64_t value) { + DCHECK(false); + } + + virtual void Set(int value) { + DCHECK(false); + } + + virtual void Add(int64_t delta) { + DCHECK(false); + } + + private: + ConcurrentStopWatch csw_; +}; + +/// Utility class to mark an event when the object is destroyed. +class ScopedEvent { + public: + ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label) + : label_(label), + event_sequence_(event_sequence) { + } + + /// Mark the event when the object is destroyed + ~ScopedEvent() { + event_sequence_->MarkEvent(label_); + } + + private: + /// Disable copy constructor and assignment + ScopedEvent(const ScopedEvent& event); + ScopedEvent& operator=(const ScopedEvent& event); + + const std::string label_; + RuntimeProfile::EventSequence* event_sequence_; +}; + +/// Utility class to update time elapsed when the object goes out of scope. +/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but +/// we use templates not to pay for virtual function overhead. In some cases +/// the runtime profile may be deleted while the counter is still active. In this +/// case the is_cancelled argument can be provided so that ScopedTimer will not +/// update the counter when the query is cancelled. The destructor for ScopedTimer +/// can access both is_cancelled and the counter, so the caller must ensure that it +/// is safe to access both at the end of the scope in which the timer is used. +template<class T> +class ScopedTimer { + public: + ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) : + counter_(counter), is_cancelled_(is_cancelled){ + if (counter == NULL) return; + DCHECK(counter->unit() == TUnit::TIME_NS); + sw_.Start(); + } + + void Stop() { sw_.Stop(); } + void Start() { sw_.Start(); } + + void UpdateCounter() { + if (counter_ != NULL && !IsCancelled()) { + counter_->Add(sw_.ElapsedTime()); + } + } + + /// Updates the underlying counter for the final time and clears the pointer to it. + void ReleaseCounter() { + UpdateCounter(); + counter_ = NULL; + } + + bool IsCancelled() { + return is_cancelled_ != NULL && *is_cancelled_; + } + + /// Update counter when object is destroyed + ~ScopedTimer() { + sw_.Stop(); + UpdateCounter(); + } + + private: + /// Disable copy constructor and assignment + ScopedTimer(const ScopedTimer& timer); + ScopedTimer& operator=(const ScopedTimer& timer); + + T sw_; + RuntimeProfile::Counter* counter_; + const bool* is_cancelled_; +}; + + +#ifdef __APPLE__ +// On OS X rusage via thread is not supported. In addition, the majority of the fields of +// the usage structs will be zeroed out. Since Apple is not going to be a major plaform +// initially it will most likely be enough to capture only time. +// C.f. http://blog.kuriositaet.de/?p=257 +#define RUSAGE_THREAD RUSAGE_SELF +#endif + +/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is +/// called. Threads measurements will then be taken using getrusage. +/// This is ~5x slower than ScopedTimer due to calling getrusage. +class ThreadCounterMeasurement { + public: + ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) : + stop_(false), counters_(counters) { + DCHECK(counters != NULL); + sw_.Start(); + int ret = getrusage(RUSAGE_THREAD, &usage_base_); + DCHECK_EQ(ret, 0); + } + + /// Stop and update the counter + void Stop() { + if (stop_) return; + stop_ = true; + sw_.Stop(); + rusage usage; + int ret = getrusage(RUSAGE_THREAD, &usage); + DCHECK_EQ(ret, 0); + int64_t utime_diff = + (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L + + (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L; + int64_t stime_diff = + (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L + + (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L; + counters_->total_time_->Add(sw_.ElapsedTime()); + counters_->user_time_->Add(utime_diff); + counters_->sys_time_->Add(stime_diff); + counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw); + counters_->involuntary_context_switches_->Add( + usage.ru_nivcsw - usage_base_.ru_nivcsw); + } + + /// Update counter when object is destroyed + ~ThreadCounterMeasurement() { + Stop(); + } + + private: + /// Disable copy constructor and assignment + ThreadCounterMeasurement(const ThreadCounterMeasurement& timer); + ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer); + + bool stop_; + rusage usage_base_; + MonotonicStopWatch sw_; + RuntimeProfile::ThreadCounters* counters_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc index d98b54c..2c80082 100644 --- a/be/src/util/runtime-profile-test.cc +++ b/be/src/util/runtime-profile-test.cc @@ -20,7 +20,7 @@ #include "common/object-pool.h" #include "util/cpu-info.h" #include "util/periodic-counter-updater.h" -#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include "util/streaming-sampler.h" #include "util/thread.h" #include "util/time.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index bc7447a..714180a 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include <iomanip> #include <iostream> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.h ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index 8030d7b..98a02ed 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -17,64 +17,16 @@ #define IMPALA_UTIL_RUNTIME_PROFILE_H #include <boost/function.hpp> -#include <boost/scoped_ptr.hpp> -#include <boost/unordered_map.hpp> +#include <boost/thread/lock_guard.hpp> #include <iostream> -#include <sys/time.h> -#include <sys/resource.h> #include "common/atomic.h" -#include "common/logging.h" -#include "common/object-pool.h" -#include "util/stopwatch.h" -#include "util/streaming-sampler.h" +#include "util/spinlock.h" + #include "gen-cpp/RuntimeProfile_types.h" namespace impala { -/// Define macros for updating counters. The macros make it very easy to disable -/// all counters at compile time. Set this to 0 to remove counters. This is useful -/// to do to make sure the counters aren't affecting the system. -#define ENABLE_COUNTERS 1 - -/// Some macro magic to generate unique ids using __COUNTER__ -#define CONCAT_IMPL(x, y) x##y -#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) - -#if ENABLE_COUNTERS - #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit) - #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \ - (profile)->AddTimeSeriesCounter(name, src_counter) - #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS) - #define ADD_CHILD_TIMER(profile, name, parent) \ - (profile)->AddCounter(name, TUnit::TIME_NS, parent) - #define SCOPED_TIMER(c) \ - ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) - #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \ - ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled) - #define COUNTER_ADD(c, v) (c)->Add(v) - #define COUNTER_SET(c, v) (c)->Set(v) - #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix) - #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \ - ThreadCounterMeasurement \ - MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c) - #define SCOPED_CONCURRENT_COUNTER(c) \ - ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \ - MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c) -#else - #define ADD_COUNTER(profile, name, unit) NULL - #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL - #define ADD_TIMER(profile, name) NULL - #define ADD_CHILD_TIMER(profile, name, parent) NULL - #define SCOPED_TIMER(c) - #define CANCEL_SAFE_SCOPED_TIMER(c) - #define COUNTER_ADD(c, v) - #define COUNTER_SET(c, v) - #define ADD_THREAD_COUNTERS(profile, prefix) NULL - #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) - #define SCOPED_CONCURRENT_COUNTER(c) -#endif - class ObjectPool; /// Runtime profile is a group of profiling counters. It supports adding named counters @@ -133,285 +85,16 @@ class RuntimeProfile { TUnit::type unit_; }; - /// A counter that keeps track of the highest value seen (reporting that - /// as value()) and the current value. - class HighWaterMarkCounter : public Counter { - public: - HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {} - - virtual void Add(int64_t delta) { - int64_t new_val = current_value_.Add(delta); - UpdateMax(new_val); - } - - /// Tries to increase the current value by delta. If current_value() + delta - /// exceeds max, return false and current_value is not changed. - bool TryAdd(int64_t delta, int64_t max) { - while (true) { - int64_t old_val = current_value_.Load(); - int64_t new_val = old_val + delta; - if (UNLIKELY(new_val > max)) return false; - if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) { - UpdateMax(new_val); - return true; - } - } - } - - virtual void Set(int64_t v) { - current_value_.Store(v); - UpdateMax(v); - } - - int64_t current_value() const { return current_value_.Load(); } - - private: - /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is - /// atomic. - void UpdateMax(int64_t v) { - while (true) { - int64_t old_max = value_.Load(); - int64_t new_max = std::max(old_max, v); - if (new_max == old_max) break; // Avoid atomic update. - if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break; - } - } - - /// The current value of the counter. value_ in the super class represents - /// the high water mark. - AtomicInt64 current_value_; - }; + class AveragedCounter; + class ConcurrentTimerCounter; + class DerivedCounter; + class EventSequence; + class HighWaterMarkCounter; + class ThreadCounters; + class TimeSeriesCounter; typedef boost::function<int64_t ()> DerivedCounterFunction; - /// A DerivedCounter also has a name and unit, but the value is computed. - /// Do not call Set() and Add(). - class DerivedCounter : public Counter { - public: - DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn) - : Counter(unit), - counter_fn_(counter_fn) {} - - virtual int64_t value() const { - return counter_fn_(); - } - - private: - DerivedCounterFunction counter_fn_; - }; - - /// An AveragedCounter maintains a set of counters and its value is the - /// average of the values in that set. The average is updated through calls - /// to UpdateCounter(), which may add a new counter or update an existing counter. - /// Set() and Add() should not be called. - class AveragedCounter : public Counter { - public: - AveragedCounter(TUnit::type unit) - : Counter(unit), - current_double_sum_(0.0), - current_int_sum_(0) { - } - - /// Update counter_value_map_ with the new counter. This may require the counter - /// to be added to the map. - /// No locks are obtained within this class because UpdateCounter() is called from - /// UpdateAverage(), which obtains locks on the entire counter map in a profile. - void UpdateCounter(Counter* new_counter) { - DCHECK_EQ(new_counter->unit_, unit_); - boost::unordered_map<Counter*, int64_t>::iterator it = - counter_value_map_.find(new_counter); - int64_t old_val = 0; - if (it != counter_value_map_.end()) { - old_val = it->second; - it->second = new_counter->value(); - } else { - counter_value_map_[new_counter] = new_counter->value(); - } - - if (unit_ == TUnit::DOUBLE_VALUE) { - double old_double_val = *reinterpret_cast<double*>(&old_val); - current_double_sum_ += (new_counter->double_value() - old_double_val); - double result_val = current_double_sum_ / (double) counter_value_map_.size(); - value_.Store(*reinterpret_cast<int64_t*>(&result_val)); - } else { - current_int_sum_ += (new_counter->value() - old_val); - value_.Store(current_int_sum_ / counter_value_map_.size()); - } - } - - /// The value for this counter should be updated through UpdateCounter(). - /// Set() and Add() should not be used. - virtual void Set(double value) { - DCHECK(false); - } - - virtual void Set(int64_t value) { - DCHECK(false); - } - - virtual void Add(int64_t delta) { - DCHECK(false); - } - - private: - /// Map from counters to their existing values. Modified via UpdateCounter(). - boost::unordered_map<Counter*, int64_t> counter_value_map_; - - /// Current sums of values from counter_value_map_. Only one of these is used, - /// depending on the unit of the counter. current_double_sum_ is used for - /// DOUBLE_VALUE, current_int_sum_ otherwise. - double current_double_sum_; - int64_t current_int_sum_; - }; - - /// A set of counters that measure thread info, such as total time, user time, sys time. - class ThreadCounters { - private: - friend class ThreadCounterMeasurement; - friend class RuntimeProfile; - - Counter* total_time_; // total wall clock time - Counter* user_time_; // user CPU time - Counter* sys_time_; // system CPU time - - /// The number of times a context switch resulted due to a process voluntarily giving - /// up the processor before its time slice was completed. - Counter* voluntary_context_switches_; - - /// The number of times a context switch resulted due to a higher priority process - /// becoming runnable or because the current process exceeded its time slice. - Counter* involuntary_context_switches_; - }; - - /// An EventSequence captures a sequence of events (each added by - /// calling MarkEvent). Each event has a text label, and a time - /// (measured relative to the moment Start() was called as t=0). It is - /// useful for tracking the evolution of some serial process, such as - /// the query lifecycle. - class EventSequence { - public: - EventSequence() { } - - /// Helper constructor for building from Thrift - EventSequence(const std::vector<int64_t>& timestamps, - const std::vector<std::string>& labels) { - DCHECK(timestamps.size() == labels.size()); - for (int i = 0; i < timestamps.size(); ++i) { - events_.push_back(make_pair(labels[i], timestamps[i])); - } - } - - /// Starts the timer without resetting it. - void Start() { sw_.Start(); } - - /// Stores an event in sequence with the given label and the current time - /// (relative to the first time Start() was called) as the timestamp. - void MarkEvent(const std::string& label) { - Event event = make_pair(label, sw_.ElapsedTime()); - boost::lock_guard<SpinLock> event_lock(lock_); - events_.push_back(event); - } - - int64_t ElapsedTime() { return sw_.ElapsedTime(); } - - /// An Event is a <label, timestamp> pair. - typedef std::pair<std::string, int64_t> Event; - - /// An EventList is a sequence of Events, in increasing timestamp order. - typedef std::vector<Event> EventList; - - /// Copies the member events_ into the supplied vector 'events'. - /// The supplied vector 'events' is cleared before this. - void GetEvents(std::vector<Event>* events) { - events->clear(); - boost::lock_guard<SpinLock> event_lock(lock_); - events->insert(events->end(), events_.begin(), events_.end()); - } - - void ToThrift(TEventSequence* seq) const; - - private: - /// Protect access to events_. - SpinLock lock_; - - /// Stored in increasing time order. - EventList events_; - - /// Timer which allows events to be timestamped when they are recorded. - MonotonicStopWatch sw_; - }; - - typedef StreamingSampler<int64_t, 64> StreamingCounterSampler; - class TimeSeriesCounter { - public: - std::string DebugString() const; - - void AddSample(int ms_elapsed) { - int64_t sample = sample_fn_(); - samples_.AddSample(sample, ms_elapsed); - } - - private: - friend class RuntimeProfile; - - TimeSeriesCounter(const std::string& name, TUnit::type unit, - DerivedCounterFunction fn) - : name_(name), unit_(unit), sample_fn_(fn) { - } - - /// Construct a time series object from existing sample data. This counter - /// is then read-only (i.e. there is no sample function). - TimeSeriesCounter(const std::string& name, TUnit::type unit, int period, - const std::vector<int64_t>& values) - : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) { - } - - void ToThrift(TTimeSeriesCounter* counter); - - std::string name_; - TUnit::type unit_; - DerivedCounterFunction sample_fn_; - StreamingCounterSampler samples_; - }; - - /// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads - /// concurrent running time. - class ConcurrentTimerCounter : public Counter { - public: - ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {} - - virtual int64_t value() const { return csw_.TotalRunningTime(); } - - void Start() { csw_.Start(); } - - void Stop() { csw_.Stop(); } - - /// Returns lap time for caller who wants delta update of concurrent running time. - uint64_t LapTime() { return csw_.LapTime(); } - - /// The value for this counter should come from internal ConcurrentStopWatch. - /// Set() and Add() should not be used. - virtual void Set(double value) { - DCHECK(false); - } - - virtual void Set(int64_t value) { - DCHECK(false); - } - - virtual void Set(int value) { - DCHECK(false); - } - - virtual void Add(int64_t delta) { - DCHECK(false); - } - - private: - ConcurrentStopWatch csw_; - }; - - /// Create a runtime profile object with 'name'. Counters and merged profile are /// allocated from pool. /// If is_averaged_profile is true, the counters in this profile will be derived @@ -721,166 +404,6 @@ class RuntimeProfile { const ChildCounterMap& child_counter_map, std::ostream* s); }; -/// Utility class to mark an event when the object is destroyed. -class ScopedEvent { - public: - ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label) - : label_(label), - event_sequence_(event_sequence) { - } - - /// Mark the event when the object is destroyed - ~ScopedEvent() { - event_sequence_->MarkEvent(label_); - } - - private: - /// Disable copy constructor and assignment - ScopedEvent(const ScopedEvent& event); - ScopedEvent& operator=(const ScopedEvent& event); - - const std::string label_; - RuntimeProfile::EventSequence* event_sequence_; -}; - -/// Utility class to update the counter at object construction and destruction. -/// When the object is constructed, decrement the counter by val. -/// When the object goes out of scope, increment the counter by val. -class ScopedCounter { - public: - ScopedCounter(RuntimeProfile::Counter* counter, int64_t val) : - val_(val), - counter_(counter) { - if (counter == NULL) return; - counter_->Add(-1L * val_); - } - - /// Increment the counter when object is destroyed - ~ScopedCounter() { - if (counter_ != NULL) counter_->Add(val_); - } - - private: - /// Disable copy constructor and assignment - ScopedCounter(const ScopedCounter& counter); - ScopedCounter& operator=(const ScopedCounter& counter); - - int64_t val_; - RuntimeProfile::Counter* counter_; -}; - -/// Utility class to update time elapsed when the object goes out of scope. -/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but -/// we use templates not to pay for virtual function overhead. In some cases -/// the runtime profile may be deleted while the counter is still active. In this -/// case the is_cancelled argument can be provided so that ScopedTimer will not -/// update the counter when the query is cancelled. The destructor for ScopedTimer -/// can access both is_cancelled and the counter, so the caller must ensure that it -/// is safe to access both at the end of the scope in which the timer is used. -template<class T> -class ScopedTimer { - public: - ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) : - counter_(counter), is_cancelled_(is_cancelled){ - if (counter == NULL) return; - DCHECK(counter->unit() == TUnit::TIME_NS); - sw_.Start(); - } - - void Stop() { sw_.Stop(); } - void Start() { sw_.Start(); } - - void UpdateCounter() { - if (counter_ != NULL && !IsCancelled()) { - counter_->Add(sw_.ElapsedTime()); - } - } - - /// Updates the underlying counter for the final time and clears the pointer to it. - void ReleaseCounter() { - UpdateCounter(); - counter_ = NULL; - } - - bool IsCancelled() { - return is_cancelled_ != NULL && *is_cancelled_; - } - - /// Update counter when object is destroyed - ~ScopedTimer() { - sw_.Stop(); - UpdateCounter(); - } - - private: - /// Disable copy constructor and assignment - ScopedTimer(const ScopedTimer& timer); - ScopedTimer& operator=(const ScopedTimer& timer); - - T sw_; - RuntimeProfile::Counter* counter_; - const bool* is_cancelled_; -}; - -#ifdef __APPLE__ -// On OS X rusage via thread is not supported. In addition, the majority of the fields of -// the usage structs will be zeroed out. Since Apple is not going to be a major plaform -// initially it will most likely be enough to capture only time. -// C.f. http://blog.kuriositaet.de/?p=257 -#define RUSAGE_THREAD RUSAGE_SELF -#endif - -/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is -/// called. Threads measurements will then be taken using getrusage. -/// This is ~5x slower than ScopedTimer due to calling getrusage. -class ThreadCounterMeasurement { - public: - ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) : - stop_(false), counters_(counters) { - DCHECK(counters != NULL); - sw_.Start(); - int ret = getrusage(RUSAGE_THREAD, &usage_base_); - DCHECK_EQ(ret, 0); - } - - /// Stop and update the counter - void Stop() { - if (stop_) return; - stop_ = true; - sw_.Stop(); - rusage usage; - int ret = getrusage(RUSAGE_THREAD, &usage); - DCHECK_EQ(ret, 0); - int64_t utime_diff = - (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L + - (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L; - int64_t stime_diff = - (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L + - (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L; - counters_->total_time_->Add(sw_.ElapsedTime()); - counters_->user_time_->Add(utime_diff); - counters_->sys_time_->Add(stime_diff); - counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw); - counters_->involuntary_context_switches_->Add( - usage.ru_nivcsw - usage_base_.ru_nivcsw); - } - - /// Update counter when object is destroyed - ~ThreadCounterMeasurement() { - Stop(); - } - - private: - /// Disable copy constructor and assignment - ThreadCounterMeasurement(const ThreadCounterMeasurement& timer); - ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer); - - bool stop_; - rusage usage_base_; - MonotonicStopWatch sw_; - RuntimeProfile::ThreadCounters* counters_; -}; - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.cc ---------------------------------------------------------------------- diff --git a/be/src/util/simple-logger.cc b/be/src/util/simple-logger.cc index 6fca3fa..87ef0f5 100644 --- a/be/src/util/simple-logger.cc +++ b/be/src/util/simple-logger.cc @@ -18,6 +18,7 @@ #include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/filesystem.hpp> #include <gutil/strings/substitute.h> +#include <boost/thread/lock_guard.hpp> #include "common/names.h" #include "util/logging-support.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.h ---------------------------------------------------------------------- diff --git a/be/src/util/simple-logger.h b/be/src/util/simple-logger.h index 66031fd..af17d26 100644 --- a/be/src/util/simple-logger.h +++ b/be/src/util/simple-logger.h @@ -15,8 +15,8 @@ #ifndef IMPALA_SERVICE_SIMPLE_LOGGER_H #define IMPALA_SERVICE_SIMPLE_LOGGER_H -#include <boost/thread/thread.hpp> #include <fstream> +#include <boost/thread/mutex.hpp> #include "common/status.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/streaming-sampler.h ---------------------------------------------------------------------- diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h index 9cf4151..71e7548 100644 --- a/be/src/util/streaming-sampler.h +++ b/be/src/util/streaming-sampler.h @@ -17,6 +17,8 @@ #include <string.h> #include <iostream> +#include <boost/thread/lock_guard.hpp> + #include "util/spinlock.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.cc ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc index a0bbf97..18ae53d 100644 --- a/be/src/util/tuple-row-compare.cc +++ b/be/src/util/tuple-row-compare.cc @@ -19,6 +19,7 @@ #include "codegen/codegen-anyval.h" #include "codegen/llvm-codegen.h" #include "runtime/runtime-state.h" +#include "util/runtime-profile-counters.h" using namespace impala; using namespace llvm; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.h ---------------------------------------------------------------------- diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h index 8505f19..d20d864 100644 --- a/be/src/util/tuple-row-compare.h +++ b/be/src/util/tuple-row-compare.h @@ -19,10 +19,11 @@ #include "exec/sort-exec-exprs.h" #include "exprs/expr.h" #include "exprs/expr-context.h" +#include "runtime/descriptors.h" +#include "runtime/raw-value.h" #include "runtime/raw-value.inline.h" #include "runtime/tuple.h" #include "runtime/tuple-row.h" -#include "runtime/descriptors.h" namespace impala {
