Add AtomicInt32/AtomicInt64 typedefs Only certain primitive types can be used with AtomicInt<>, so let's treat AtomicInt<> as an implementation detail and instead expose typedefs for the Atomic integer sizes we support. Convert all decls to use the new typedefs.
Change-Id: I8b91ba684aabc67ed1721c7b7320aa42049268c8 Reviewed-on: http://gerrit.cloudera.org:8080/2601 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/10e7de79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/10e7de79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/10e7de79 Branch: refs/heads/master Commit: 10e7de7969f96fe3edb00fcab3f7a8c432bd20a0 Parents: c039791 Author: Dan Hecht <[email protected]> Authored: Tue Mar 22 14:24:23 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Wed Mar 23 22:29:18 2016 +0000 ---------------------------------------------------------------------- be/src/common/atomic-test.cc | 3 ++- be/src/common/atomic.h | 12 +++++++++++- be/src/exec/hdfs-scan-node.h | 8 ++++---- be/src/rpc/rpc-trace.h | 2 +- be/src/runtime/data-stream-recvr.h | 2 +- be/src/runtime/disk-io-mgr-internal.h | 24 ++++++++++++------------ be/src/runtime/disk-io-mgr-test.cc | 18 +++++++++--------- be/src/runtime/disk-io-mgr.h | 4 ++-- be/src/runtime/lib-cache.h | 2 +- be/src/runtime/mem-tracker.cc | 2 +- be/src/runtime/mem-tracker.h | 2 +- be/src/runtime/plan-fragment-executor.h | 2 +- be/src/scheduling/query-resource-mgr.cc | 8 ++++---- be/src/scheduling/query-resource-mgr.h | 8 ++++---- be/src/util/counting-barrier.h | 2 +- be/src/util/hdfs-bulk-ops.h | 2 +- be/src/util/internal-queue-test.cc | 6 +++--- be/src/util/periodic-counter-updater.h | 2 +- be/src/util/progress-updater.h | 4 ++-- be/src/util/runtime-profile.h | 4 ++-- 20 files changed, 64 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/common/atomic-test.cc ---------------------------------------------------------------------- diff --git a/be/src/common/atomic-test.cc b/be/src/common/atomic-test.cc index 5715022..4e4559c 100644 --- a/be/src/common/atomic-test.cc +++ b/be/src/common/atomic-test.cc @@ -20,9 +20,10 @@ #include "common/names.h" - namespace impala { +using namespace internal; // Testing AtomicInt<> directly. + // Simple test to make sure there is no obvious error in the operations. This is not // intended to test the thread safety. template<typename T> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/common/atomic.h ---------------------------------------------------------------------- diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h index 07a9036..b0b8faa 100644 --- a/be/src/common/atomic.h +++ b/be/src/common/atomic.h @@ -45,7 +45,10 @@ class AtomicUtil { } }; -/// Atomic integer. 'T' can be either 32-bit or 64-bit signed integer. Each operation +namespace internal { + +/// Atomic integer. This class template should not be used directly; instead use the +/// typedefs below. 'T' can be either 32-bit or 64-bit signed integer. Each operation /// is performed atomically and has a specified memory-ordering semantic: /// /// Acquire: these operations ensure no later memory access by the same thread can be @@ -102,6 +105,13 @@ class AtomicInt { DISALLOW_COPY_AND_ASSIGN(AtomicInt); }; +} // namespace internal + +/// Supported atomic types. Use these types rather than referring to AtomicInt<> +/// directly. +typedef internal::AtomicInt<int32_t> AtomicInt32; +typedef internal::AtomicInt<int64_t> AtomicInt64; + } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 4ddd33d..73b9527 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -369,7 +369,7 @@ class HdfsScanNode : public ScanNode { int64_t scanner_thread_bytes_required_; /// Number of files that have not been issued from the scanners. - AtomicInt<int> num_unqueued_files_; + AtomicInt32 num_unqueued_files_; /// Map of HdfsScanner objects to file types. Only one scanner object will be /// created for each file type. Objects stored in runtime_state's pool. @@ -424,12 +424,12 @@ class HdfsScanNode : public ScanNode { /// This is the number of io buffers that are owned by the scan node and the scanners. /// This is used just to help debug leaked io buffers to determine if the leak is /// happening in the scanners vs other parts of the execution. - AtomicInt<int> num_owned_io_buffers_; + AtomicInt32 num_owned_io_buffers_; /// Counters which track the number of scanners that have codegen enabled for the /// materialize and conjuncts evaluation code paths. - AtomicInt<int> num_scanners_codegen_enabled_; - AtomicInt<int> num_scanners_codegen_disabled_; + AtomicInt32 num_scanners_codegen_enabled_; + AtomicInt32 num_scanners_codegen_disabled_; /// The size of the largest compressed text file to be scanned. This is used to /// estimate scanner thread memory usage. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/rpc/rpc-trace.h ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h index aed05f4..5abe0a2 100644 --- a/be/src/rpc/rpc-trace.h +++ b/be/src/rpc/rpc-trace.h @@ -84,7 +84,7 @@ class RpcEventHandler : public apache::thrift::TProcessorEventHandler { StatsMetric<double>* time_stats; /// Number of invocations in flight - AtomicInt<int32_t> num_in_flight; + AtomicInt32 num_in_flight; }; /// Map from method name to descriptor http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/data-stream-recvr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h index 7879c44..2a3b184 100644 --- a/be/src/runtime/data-stream-recvr.h +++ b/be/src/runtime/data-stream-recvr.h @@ -136,7 +136,7 @@ class DataStreamRecvr { bool is_merging_; /// total number of bytes held across all sender queues. - AtomicInt<int> num_buffered_bytes_; + AtomicInt32 num_buffered_bytes_; /// Memtracker for batches in the sender queue(s). boost::scoped_ptr<MemTracker> mem_tracker_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h index fe607e7..e58a3f3 100644 --- a/be/src/runtime/disk-io-mgr-internal.h +++ b/be/src/runtime/disk-io-mgr-internal.h @@ -212,48 +212,48 @@ class DiskIoMgr::RequestContext { RuntimeProfile::Counter* disks_accessed_bitmap_; /// Total number of bytes read locally, updated at end of each range scan - AtomicInt<int64_t> bytes_read_local_; + AtomicInt64 bytes_read_local_; /// Total number of bytes read via short circuit read, updated at end of each range scan - AtomicInt<int64_t> bytes_read_short_circuit_; + AtomicInt64 bytes_read_short_circuit_; /// Total number of bytes read from date node cache, updated at end of each range scan - AtomicInt<int64_t> bytes_read_dn_cache_; + AtomicInt64 bytes_read_dn_cache_; /// Total number of bytes from remote reads that were expected to be local. - AtomicInt<int64_t> unexpected_remote_bytes_; + AtomicInt64 unexpected_remote_bytes_; /// The number of buffers that have been returned to the reader (via GetNext) that the /// reader has not returned. Only included for debugging and diagnostics. - AtomicInt<int> num_buffers_in_reader_; + AtomicInt32 num_buffers_in_reader_; /// The number of scan ranges that have been completed for this reader. - AtomicInt<int> num_finished_ranges_; + AtomicInt32 num_finished_ranges_; /// The number of scan ranges that required a remote read, updated at the end of each /// range scan. Only used for diagnostics. - AtomicInt<int> num_remote_ranges_; + AtomicInt32 num_remote_ranges_; /// The total number of scan ranges that have not been started. Only used for /// diagnostics. This is the sum of all unstarted_scan_ranges across all disks. - AtomicInt<int> num_unstarted_scan_ranges_; + AtomicInt32 num_unstarted_scan_ranges_; /// The number of buffers that are being used for this reader. This is the sum /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about /// to be queued). - AtomicInt<int> num_used_buffers_; + AtomicInt32 num_used_buffers_; /// The total number of ready buffers across all ranges. Ready buffers are buffers /// that have been read from disk but not retrieved by the caller. /// This is the sum of all queued buffers in all ranges for this reader context. - AtomicInt<int> num_ready_buffers_; + AtomicInt32 num_ready_buffers_; /// The total (sum) of queue capacities for finished scan ranges. This value /// divided by num_finished_ranges_ is the average for finished ranges and /// used to seed the starting queue capacity for future ranges. The assumption /// is that if previous ranges were fast, new ones will be fast too. The scan /// range adjusts the queue capacity dynamically so a rough approximation will do. - AtomicInt<int> total_range_queue_capacity_; + AtomicInt32 total_range_queue_capacity_; /// The initial queue size for new scan ranges. This is always /// total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate @@ -447,7 +447,7 @@ class DiskIoMgr::RequestContext { /// entire operation, we need this ref count to keep track of which thread should do /// final resource cleanup during cancellation. /// Only the thread that sees the count at 0 should do the final cleanup. - AtomicInt<int> num_threads_in_op_; + AtomicInt32 num_threads_in_op_; /// Queue of write ranges to process for this disk. A write range is always added /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index 806a036..ee89f56 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -136,7 +136,7 @@ class DiskIoMgrTest : public testing::Test { // Updates num_ranges_processed with the number of ranges seen by this thread. static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, const char* expected_result, int expected_len, const Status& expected_status, - int max_ranges, AtomicInt<int>* num_ranges_processed) { + int max_ranges, AtomicInt32* num_ranges_processed) { int num_ranges = 0; while (max_ranges == 0 || num_ranges < max_ranges) { DiskIoMgr::ScanRange* range; @@ -373,7 +373,7 @@ TEST_F(DiskIoMgrTest, SingleReader) { } ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < num_read_threads; ++i) { threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, @@ -432,7 +432,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { stat_val.st_mtime)); } } - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; // Issue first half the scan ranges. ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half)); @@ -507,7 +507,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) { } ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < 5; ++i) { threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, @@ -570,7 +570,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { } ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; int num_succesful_ranges = ranges.size() / 2; // Read half the ranges for (int i = 0; i < num_succesful_ranges; ++i) { @@ -637,7 +637,7 @@ TEST_F(DiskIoMgrTest, MemLimits) { // Don't return buffers to force memory pressure vector<DiskIoMgr::BufferDescriptor*> buffers; - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::MemLimitExceeded(), 1, &num_ranges_processed); @@ -717,7 +717,7 @@ TEST_F(DiskIoMgrTest, CachedReads) { } ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < 5; ++i) { threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, @@ -780,7 +780,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { while (read_offset < file_size) { for (int context_index = 0; context_index < num_contexts; ++context_index) { if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters; - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; thread_group threads; vector<DiskIoMgr::ScanRange*> ranges; int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset); @@ -898,7 +898,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) { ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges)); } - AtomicInt<int> num_ranges_processed; + AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < NUM_READERS; ++i) { for (int j = 0; j < NUM_THREADS_PER_READER; ++j) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index d695555..b130d52 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -734,10 +734,10 @@ class DiskIoMgr { std::list<BufferDescriptor*> free_buffer_descs_; /// Total number of allocated buffers, used for debugging. - AtomicInt<int> num_allocated_buffers_; + AtomicInt32 num_allocated_buffers_; /// Total number of buffers in readers - AtomicInt<int> num_buffers_in_readers_; + AtomicInt32 num_buffers_in_readers_; /// Per disk queues. This is static and created once at Init() time. One queue is /// allocated for each local disk on the system and for each remote filesystem type. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/lib-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h index 005f166..b36a859 100644 --- a/be/src/runtime/lib-cache.h +++ b/be/src/runtime/lib-cache.h @@ -117,7 +117,7 @@ class LibCache { /// The number of libs that have been copied from HDFS to the local FS. /// This is appended to the local fs path to remove collisions. - AtomicInt<int64_t> num_libs_copied_; + AtomicInt64 num_libs_copied_; /// Protects lib_cache_. For lock ordering, this lock must always be taken before /// the per entry lock. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index 1fc462f..e12c946 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -39,7 +39,7 @@ MemTracker::RequestTrackersMap MemTracker::request_to_mem_trackers_; MemTracker::PoolTrackersMap MemTracker::pool_to_mem_trackers_; mutex MemTracker::static_mem_trackers_lock_; -AtomicInt<int64_t> MemTracker::released_memory_since_gc_; +AtomicInt64 MemTracker::released_memory_since_gc_; // Name for request pool MemTrackers. '$0' is replaced with the pool name. const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0"; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 46f08c3..2fd23a7 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -394,7 +394,7 @@ class MemTracker { /// Total amount of memory from calls to Release() since the last GC. If this /// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc. - static AtomicInt<int64_t> released_memory_since_gc_; + static AtomicInt64 released_memory_since_gc_; /// Lock to protect GcMemory(). This prevents many GCs from occurring at once. boost::mutex gc_lock_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h index bb2f3a9..91cece6 100644 --- a/be/src/runtime/plan-fragment-executor.h +++ b/be/src/runtime/plan-fragment-executor.h @@ -231,7 +231,7 @@ class PlanFragmentExecutor { /// be fired. It is initialized to 0 and atomically swapped to 1 when a completed /// fragment report is about to be fired. Used for reducing the probability that a /// report is sent twice at the end of the fragment. - AtomicInt<int> completed_report_sent_; + AtomicInt32 completed_report_sent_; /// Sampled memory usage at even time intervals. RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/scheduling/query-resource-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc index 6a5f8f6..e1e0581 100644 --- a/be/src/scheduling/query-resource-mgr.cc +++ b/be/src/scheduling/query-resource-mgr.cc @@ -97,8 +97,8 @@ void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) { // inspects immediately after exiting Expand(), and if true, exits before touching any // of the class-wide state (because the destructor may have finished before this point). - thread_in_expand_.reset(new AtomicInt<int32_t>()); - early_exit_.reset(new AtomicInt<int32_t>()); + thread_in_expand_.reset(new AtomicInt32()); + early_exit_.reset(new AtomicInt32()); acquire_vcore_thread_.reset( new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)), bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this, @@ -170,8 +170,8 @@ Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes, } void QueryResourceMgr::AcquireVcoreResources( - shared_ptr<AtomicInt<int32_t> > thread_in_expand, - shared_ptr<AtomicInt<int32_t> > early_exit) { + shared_ptr<AtomicInt32> thread_in_expand, + shared_ptr<AtomicInt32> early_exit) { // Take a copy because we'd like to print it in some cases after the destructor. TUniqueId reservation_id = reservation_id_; VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/scheduling/query-resource-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-resource-mgr.h b/be/src/scheduling/query-resource-mgr.h index 589e327..c6080e8 100644 --- a/be/src/scheduling/query-resource-mgr.h +++ b/be/src/scheduling/query-resource-mgr.h @@ -195,12 +195,12 @@ class QueryResourceMgr { /// parent QueryResourceMgr has been destroyed. /// TODO: Combine with ShouldExit(), and replace with AtomicBool when we have such a /// thing. - boost::shared_ptr<AtomicInt<int32_t> > early_exit_; + boost::shared_ptr<AtomicInt32> early_exit_; /// Signals to the destructor that the vcore acquisition thread is currently in an /// Expand() RPC. If so, the destructor does not need to wait for the acquisition thread /// to exit. - boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand_; + boost::shared_ptr<AtomicInt32> thread_in_expand_; /// Creates the llama resource for the memory and/or cores specified, associated with /// the reservation context. @@ -209,8 +209,8 @@ class QueryResourceMgr { /// Run as a thread owned by acquire_cpu_thread_. Waits for notification from /// NotifyThreadUsageChange(), then checks the subscription level to decide if more /// VCores are needed, and starts a new expansion request if so. - void AcquireVcoreResources(boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand, - boost::shared_ptr<AtomicInt<int32_t> > early_exit); + void AcquireVcoreResources(boost::shared_ptr<AtomicInt32 > thread_in_expand, + boost::shared_ptr<AtomicInt32> early_exit); /// True if thread:VCore subscription is too high, meaning more VCores are required. /// Must be called holding threads_running_ lock. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/counting-barrier.h ---------------------------------------------------------------------- diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h index 917c8ec..72e7ac9 100644 --- a/be/src/util/counting-barrier.h +++ b/be/src/util/counting-barrier.h @@ -43,7 +43,7 @@ class CountingBarrier { Promise<bool> promise_; /// The number of pending notifications remaining. - AtomicInt<int32_t> count_; + AtomicInt32 count_; DISALLOW_COPY_AND_ASSIGN(CountingBarrier); }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/hdfs-bulk-ops.h ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h index f543a06..432029f 100644 --- a/be/src/util/hdfs-bulk-ops.h +++ b/be/src/util/hdfs-bulk-ops.h @@ -125,7 +125,7 @@ class HdfsOperationSet { /// The number of ops remaining to be executed. Used to coordinate between executor /// threads so that when all ops are finished, promise_ is signalled. - AtomicInt<int64_t> num_ops_; + AtomicInt64 num_ops_; /// HDFS connection shared between all operations. Not owned by this class. hdfsFS* hdfs_connection_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/internal-queue-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/internal-queue-test.cc b/be/src/util/internal-queue-test.cc index 26e91a5..30bb1e8 100644 --- a/be/src/util/internal-queue-test.cc +++ b/be/src/util/internal-queue-test.cc @@ -150,7 +150,7 @@ const int VALIDATE_INTERVAL = 10000; // CHECK() is not thread safe so return the result in *failed. void ProducerThread(InternalQueue<IntNode>* queue, int num_inserts, - vector<IntNode>* nodes, AtomicInt<int32_t>* counter, bool* failed) { + vector<IntNode>* nodes, AtomicInt32* counter, bool* failed) { for (int i = 0; i < num_inserts && !*failed; ++i) { // Get the next index to queue. int32_t value = counter->Add(1) - 1; @@ -204,7 +204,7 @@ TEST(InternalQueue, TestClear) { TEST(InternalQueue, TestSingleProducerSingleConsumer) { vector<IntNode> nodes; - AtomicInt<int32_t> counter; + AtomicInt32 counter; nodes.resize(1000000); vector<int> results; @@ -233,7 +233,7 @@ TEST(InternalQueue, TestMultiProducerMultiConsumer) { bool failed = false; for (int num_producers = 1; num_producers < 5; num_producers += 3) { - AtomicInt<int32_t> counter; + AtomicInt32 counter; const int NUM_CONSUMERS = 4; ASSERT_EQ(nodes.size() % NUM_CONSUMERS, 0); ASSERT_EQ(nodes.size() % num_producers, 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/periodic-counter-updater.h ---------------------------------------------------------------------- diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h index 660502e..6887a91 100644 --- a/be/src/util/periodic-counter-updater.h +++ b/be/src/util/periodic-counter-updater.h @@ -137,7 +137,7 @@ class PeriodicCounterUpdater { TimeSeriesCounters time_series_counters_; /// If 1, tear down the update thread. - AtomicInt<int32_t> done_; + AtomicInt32 done_; /// Singleton object that keeps track of all rate counters and the thread /// for updating them. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/progress-updater.h ---------------------------------------------------------------------- diff --git a/be/src/util/progress-updater.h b/be/src/util/progress-updater.h index d938221..774a45a 100644 --- a/be/src/util/progress-updater.h +++ b/be/src/util/progress-updater.h @@ -75,10 +75,10 @@ class ProgressUpdater { int update_period_; /// Number of completed work items. - AtomicInt<int64_t> num_complete_; + AtomicInt64 num_complete_; /// Percentage when the last output was generated. - AtomicInt<int> last_output_percentage_; + AtomicInt32 last_output_percentage_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/runtime-profile.h ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index 6858c5d..6695b65 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -125,7 +125,7 @@ class RuntimeProfile { protected: friend class RuntimeProfile; - AtomicInt<int64_t> value_; + AtomicInt64 value_; TUnit::type unit_; }; @@ -175,7 +175,7 @@ class RuntimeProfile { /// The current value of the counter. value_ in the super class represents /// the high water mark. - AtomicInt<int64_t> current_value_; + AtomicInt64 current_value_; }; typedef boost::function<int64_t ()> DerivedCounterFunction;
