IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8 Update LICENSE.txt and rat_exclude_files.txt
Change-Id: I6d89384730b60354b5fae2b1472183d2a561d170 Reviewed-on: http://gerrit.cloudera.org:8080/5714 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d6abb29d Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d6abb29d Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d6abb29d Branch: refs/heads/master Commit: d6abb29dc915521b98a32cadbe82eb02f46c37da Parents: d062257 Author: Henry Robinson <[email protected]> Authored: Tue Oct 25 14:57:17 2016 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Jun 17 00:42:48 2017 +0000 ---------------------------------------------------------------------- LICENSE.txt | 35 + be/src/kudu/util/CMakeLists.txt | 464 ++++ be/src/kudu/util/alignment.h | 28 + be/src/kudu/util/async_logger.cc | 153 ++ be/src/kudu/util/async_logger.h | 202 ++ be/src/kudu/util/async_util.h | 72 + be/src/kudu/util/atomic-test.cc | 134 + be/src/kudu/util/atomic.cc | 55 + be/src/kudu/util/atomic.h | 320 +++ be/src/kudu/util/auto_release_pool.h | 99 + be/src/kudu/util/barrier.h | 68 + be/src/kudu/util/bit-stream-utils.h | 150 ++ be/src/kudu/util/bit-stream-utils.inline.h | 211 ++ be/src/kudu/util/bit-util-test.cc | 45 + be/src/kudu/util/bit-util.h | 57 + be/src/kudu/util/bitmap-test.cc | 223 ++ be/src/kudu/util/bitmap.cc | 132 + be/src/kudu/util/bitmap.h | 212 ++ be/src/kudu/util/blocking_queue-test.cc | 245 ++ be/src/kudu/util/blocking_queue.h | 255 ++ be/src/kudu/util/bloom_filter-test.cc | 87 + be/src/kudu/util/bloom_filter.cc | 86 + be/src/kudu/util/bloom_filter.h | 249 ++ be/src/kudu/util/boost_mutex_utils.h | 45 + be/src/kudu/util/cache-test.cc | 234 ++ be/src/kudu/util/cache.cc | 512 ++++ be/src/kudu/util/cache.h | 207 ++ be/src/kudu/util/cache_metrics.cc | 69 + be/src/kudu/util/cache_metrics.h | 47 + be/src/kudu/util/callback_bind-test.cc | 110 + be/src/kudu/util/coding-inl.h | 117 + be/src/kudu/util/coding.cc | 141 + be/src/kudu/util/coding.h | 110 + .../kudu/util/compression/compression-test.cc | 88 + be/src/kudu/util/compression/compression.proto | 29 + .../kudu/util/compression/compression_codec.cc | 283 ++ .../kudu/util/compression/compression_codec.h | 75 + be/src/kudu/util/condition_variable.cc | 140 + be/src/kudu/util/condition_variable.h | 113 + be/src/kudu/util/countdown_latch-test.cc | 71 + be/src/kudu/util/countdown_latch.h | 138 + be/src/kudu/util/cow_object.h | 219 ++ be/src/kudu/util/crc-test.cc | 103 + be/src/kudu/util/crc.cc | 56 + be/src/kudu/util/crc.h | 43 + be/src/kudu/util/curl_util.cc | 102 + be/src/kudu/util/curl_util.h | 80 + be/src/kudu/util/debug-util-test.cc | 147 ++ be/src/kudu/util/debug-util.cc | 428 ++++ be/src/kudu/util/debug-util.h | 171 ++ be/src/kudu/util/debug/leak_annotations.h | 84 + be/src/kudu/util/debug/leakcheck_disabler.h | 48 + be/src/kudu/util/debug/sanitizer_scopes.h | 47 + be/src/kudu/util/debug/trace_event.h | 1500 +++++++++++ be/src/kudu/util/debug/trace_event_impl.cc | 2416 ++++++++++++++++++ be/src/kudu/util/debug/trace_event_impl.h | 718 ++++++ .../util/debug/trace_event_impl_constants.cc | 14 + be/src/kudu/util/debug/trace_event_memory.h | 28 + .../util/debug/trace_event_synthetic_delay.cc | 230 ++ .../util/debug/trace_event_synthetic_delay.h | 162 ++ be/src/kudu/util/debug/trace_logging.h | 118 + be/src/kudu/util/debug_ref_counted.h | 56 + be/src/kudu/util/env-test.cc | 981 +++++++ be/src/kudu/util/env.cc | 90 + be/src/kudu/util/env.h | 643 +++++ be/src/kudu/util/env_posix.cc | 1608 ++++++++++++ be/src/kudu/util/env_util-test.cc | 169 ++ be/src/kudu/util/env_util.cc | 290 +++ be/src/kudu/util/env_util.h | 109 + be/src/kudu/util/errno-test.cc | 49 + be/src/kudu/util/errno.cc | 52 + be/src/kudu/util/errno.h | 35 + be/src/kudu/util/failure_detector-test.cc | 112 + be/src/kudu/util/failure_detector.cc | 214 ++ be/src/kudu/util/failure_detector.h | 179 ++ be/src/kudu/util/faststring-test.cc | 60 + be/src/kudu/util/faststring.cc | 72 + be/src/kudu/util/faststring.h | 256 ++ be/src/kudu/util/fault_injection.cc | 75 + be/src/kudu/util/fault_injection.h | 88 + be/src/kudu/util/file_cache-stress-test.cc | 390 +++ be/src/kudu/util/file_cache-test-util.h | 84 + be/src/kudu/util/file_cache-test.cc | 306 +++ be/src/kudu/util/file_cache.cc | 629 +++++ be/src/kudu/util/file_cache.h | 186 ++ be/src/kudu/util/flag_tags-test.cc | 128 + be/src/kudu/util/flag_tags.cc | 88 + be/src/kudu/util/flag_tags.h | 170 ++ be/src/kudu/util/flag_validators-test.cc | 245 ++ be/src/kudu/util/flag_validators.cc | 67 + be/src/kudu/util/flag_validators.h | 102 + be/src/kudu/util/flags-test.cc | 100 + be/src/kudu/util/flags.cc | 555 ++++ be/src/kudu/util/flags.h | 74 + be/src/kudu/util/group_varint-inl.h | 268 ++ be/src/kudu/util/group_varint-test.cc | 135 + be/src/kudu/util/group_varint.cc | 78 + be/src/kudu/util/hash_util-test.cc | 40 + be/src/kudu/util/hash_util.h | 68 + be/src/kudu/util/hdr_histogram-test.cc | 113 + be/src/kudu/util/hdr_histogram.cc | 497 ++++ be/src/kudu/util/hdr_histogram.h | 351 +++ be/src/kudu/util/hexdump.cc | 80 + be/src/kudu/util/hexdump.h | 34 + be/src/kudu/util/high_water_mark.h | 85 + be/src/kudu/util/histogram.proto | 48 + be/src/kudu/util/init.cc | 86 + be/src/kudu/util/init.h | 34 + be/src/kudu/util/inline_slice-test.cc | 84 + be/src/kudu/util/inline_slice.h | 182 ++ be/src/kudu/util/interval_tree-inl.h | 444 ++++ be/src/kudu/util/interval_tree-test.cc | 347 +++ be/src/kudu/util/interval_tree.h | 158 ++ be/src/kudu/util/jsonreader-test.cc | 170 ++ be/src/kudu/util/jsonreader.cc | 124 + be/src/kudu/util/jsonreader.h | 89 + be/src/kudu/util/jsonwriter-test.cc | 159 ++ be/src/kudu/util/jsonwriter.cc | 327 +++ be/src/kudu/util/jsonwriter.h | 98 + be/src/kudu/util/jsonwriter_test.proto | 79 + be/src/kudu/util/kernel_stack_watchdog.cc | 199 ++ be/src/kudu/util/kernel_stack_watchdog.h | 267 ++ be/src/kudu/util/knapsack_solver-test.cc | 169 ++ be/src/kudu/util/knapsack_solver.h | 269 ++ be/src/kudu/util/locks.cc | 42 + be/src/kudu/util/locks.h | 285 +++ be/src/kudu/util/logging-test.cc | 222 ++ be/src/kudu/util/logging.cc | 397 +++ be/src/kudu/util/logging.h | 359 +++ be/src/kudu/util/logging_callback.h | 46 + be/src/kudu/util/logging_test_util.h | 60 + be/src/kudu/util/maintenance_manager-test.cc | 321 +++ be/src/kudu/util/maintenance_manager.cc | 503 ++++ be/src/kudu/util/maintenance_manager.h | 324 +++ be/src/kudu/util/maintenance_manager.proto | 49 + be/src/kudu/util/make_shared.h | 64 + be/src/kudu/util/malloc.cc | 35 + be/src/kudu/util/malloc.h | 32 + be/src/kudu/util/map-util-test.cc | 103 + be/src/kudu/util/mem_tracker-test.cc | 278 ++ be/src/kudu/util/mem_tracker.cc | 291 +++ be/src/kudu/util/mem_tracker.h | 274 ++ be/src/kudu/util/memcmpable_varint-test.cc | 207 ++ be/src/kudu/util/memcmpable_varint.cc | 257 ++ be/src/kudu/util/memcmpable_varint.h | 43 + be/src/kudu/util/memory/arena-test.cc | 202 ++ be/src/kudu/util/memory/arena.cc | 161 ++ be/src/kudu/util/memory/arena.h | 473 ++++ be/src/kudu/util/memory/memory.cc | 338 +++ be/src/kudu/util/memory/memory.h | 976 +++++++ be/src/kudu/util/memory/overwrite.cc | 43 + be/src/kudu/util/memory/overwrite.h | 31 + be/src/kudu/util/metrics-test.cc | 309 +++ be/src/kudu/util/metrics.cc | 686 +++++ be/src/kudu/util/metrics.h | 1081 ++++++++ be/src/kudu/util/minidump-test.cc | 144 ++ be/src/kudu/util/minidump.cc | 378 +++ be/src/kudu/util/minidump.h | 105 + be/src/kudu/util/monotime-test.cc | 419 +++ be/src/kudu/util/monotime.cc | 325 +++ be/src/kudu/util/monotime.h | 412 +++ be/src/kudu/util/mt-hdr_histogram-test.cc | 111 + be/src/kudu/util/mt-metrics-test.cc | 121 + be/src/kudu/util/mt-threadlocal-test.cc | 349 +++ be/src/kudu/util/mutex.cc | 157 ++ be/src/kudu/util/mutex.h | 142 + be/src/kudu/util/net/dns_resolver-test.cc | 55 + be/src/kudu/util/net/dns_resolver.cc | 63 + be/src/kudu/util/net/dns_resolver.h | 63 + be/src/kudu/util/net/net_util-test.cc | 162 ++ be/src/kudu/util/net/net_util.cc | 383 +++ be/src/kudu/util/net/net_util.h | 164 ++ be/src/kudu/util/net/sockaddr.cc | 137 + be/src/kudu/util/net/sockaddr.h | 89 + be/src/kudu/util/net/socket.cc | 582 +++++ be/src/kudu/util/net/socket.h | 162 ++ be/src/kudu/util/nvm_cache.cc | 578 +++++ be/src/kudu/util/nvm_cache.h | 30 + be/src/kudu/util/object_pool-test.cc | 84 + be/src/kudu/util/object_pool.h | 168 ++ be/src/kudu/util/oid_generator-test.cc | 50 + be/src/kudu/util/oid_generator.cc | 62 + be/src/kudu/util/oid_generator.h | 61 + be/src/kudu/util/once-test.cc | 110 + be/src/kudu/util/once.cc | 32 + be/src/kudu/util/once.h | 110 + be/src/kudu/util/os-util-test.cc | 59 + be/src/kudu/util/os-util.cc | 142 + be/src/kudu/util/os-util.h | 65 + be/src/kudu/util/path_util-test.cc | 77 + be/src/kudu/util/path_util.cc | 81 + be/src/kudu/util/path_util.h | 49 + be/src/kudu/util/pb_util-internal.cc | 102 + be/src/kudu/util/pb_util-internal.h | 130 + be/src/kudu/util/pb_util-test.cc | 612 +++++ be/src/kudu/util/pb_util.cc | 956 +++++++ be/src/kudu/util/pb_util.h | 492 ++++ be/src/kudu/util/pb_util.proto | 45 + be/src/kudu/util/pb_util_test.proto | 29 + be/src/kudu/util/process_memory-test.cc | 71 + be/src/kudu/util/process_memory.cc | 275 ++ be/src/kudu/util/process_memory.h | 56 + be/src/kudu/util/promise.h | 79 + be/src/kudu/util/proto_container_test.proto | 25 + be/src/kudu/util/proto_container_test2.proto | 29 + be/src/kudu/util/proto_container_test3.proto | 33 + be/src/kudu/util/protobuf-annotations.h | 33 + be/src/kudu/util/protobuf_util.h | 39 + be/src/kudu/util/protoc-gen-insertions.cc | 72 + be/src/kudu/util/pstack_watcher-test.cc | 85 + be/src/kudu/util/pstack_watcher.cc | 195 ++ be/src/kudu/util/pstack_watcher.h | 93 + be/src/kudu/util/random-test.cc | 164 ++ be/src/kudu/util/random.h | 252 ++ be/src/kudu/util/random_util-test.cc | 73 + be/src/kudu/util/random_util.cc | 53 + be/src/kudu/util/random_util.h | 39 + be/src/kudu/util/resettable_heartbeater-test.cc | 104 + be/src/kudu/util/resettable_heartbeater.cc | 179 ++ be/src/kudu/util/resettable_heartbeater.h | 79 + be/src/kudu/util/rle-encoding.h | 523 ++++ be/src/kudu/util/rle-test.cc | 537 ++++ be/src/kudu/util/rolling_log-test.cc | 120 + be/src/kudu/util/rolling_log.cc | 258 ++ be/src/kudu/util/rolling_log.h | 107 + be/src/kudu/util/rw_mutex-test.cc | 182 ++ be/src/kudu/util/rw_mutex.cc | 197 ++ be/src/kudu/util/rw_mutex.h | 119 + be/src/kudu/util/rw_semaphore-test.cc | 91 + be/src/kudu/util/rw_semaphore.h | 203 ++ be/src/kudu/util/rwc_lock-test.cc | 143 ++ be/src/kudu/util/rwc_lock.cc | 123 + be/src/kudu/util/rwc_lock.h | 136 + be/src/kudu/util/safe_math-test.cc | 56 + be/src/kudu/util/safe_math.h | 69 + be/src/kudu/util/scoped_cleanup-test.cc | 45 + be/src/kudu/util/scoped_cleanup.h | 51 + be/src/kudu/util/semaphore.cc | 95 + be/src/kudu/util/semaphore.h | 76 + be/src/kudu/util/semaphore_macosx.cc | 72 + be/src/kudu/util/signal.cc | 47 + be/src/kudu/util/signal.h | 42 + be/src/kudu/util/slice-test.cc | 56 + be/src/kudu/util/slice.cc | 74 + be/src/kudu/util/slice.h | 317 +++ be/src/kudu/util/spinlock_profiling-test.cc | 85 + be/src/kudu/util/spinlock_profiling.cc | 348 +++ be/src/kudu/util/spinlock_profiling.h | 76 + be/src/kudu/util/stack_watchdog-test.cc | 104 + be/src/kudu/util/status-test.cc | 98 + be/src/kudu/util/status.cc | 162 ++ be/src/kudu/util/status.h | 433 ++++ be/src/kudu/util/status_callback.cc | 36 + be/src/kudu/util/status_callback.h | 48 + be/src/kudu/util/stopwatch.h | 342 +++ be/src/kudu/util/string_case-test.cc | 63 + be/src/kudu/util/string_case.cc | 73 + be/src/kudu/util/string_case.h | 48 + be/src/kudu/util/striped64-test.cc | 152 ++ be/src/kudu/util/striped64.cc | 177 ++ be/src/kudu/util/striped64.h | 174 ++ be/src/kudu/util/subprocess-test.cc | 264 ++ be/src/kudu/util/subprocess.cc | 707 +++++ be/src/kudu/util/subprocess.h | 191 ++ be/src/kudu/util/test_graph.cc | 117 + be/src/kudu/util/test_graph.h | 87 + be/src/kudu/util/test_macros.h | 123 + be/src/kudu/util/test_main.cc | 107 + be/src/kudu/util/test_util.cc | 285 +++ be/src/kudu/util/test_util.h | 113 + be/src/kudu/util/test_util_prod.cc | 28 + be/src/kudu/util/test_util_prod.h | 32 + be/src/kudu/util/thread-test.cc | 152 ++ be/src/kudu/util/thread.cc | 617 +++++ be/src/kudu/util/thread.h | 362 +++ be/src/kudu/util/thread_restrictions.cc | 85 + be/src/kudu/util/thread_restrictions.h | 121 + be/src/kudu/util/threadlocal.cc | 71 + be/src/kudu/util/threadlocal.h | 143 ++ be/src/kudu/util/threadlocal_cache.h | 110 + be/src/kudu/util/threadpool-test.cc | 367 +++ be/src/kudu/util/threadpool.cc | 410 +++ be/src/kudu/util/threadpool.h | 257 ++ be/src/kudu/util/throttler-test.cc | 77 + be/src/kudu/util/throttler.cc | 66 + be/src/kudu/util/throttler.h | 63 + be/src/kudu/util/trace-test.cc | 845 ++++++ be/src/kudu/util/trace.cc | 261 ++ be/src/kudu/util/trace.h | 308 +++ be/src/kudu/util/trace_metrics.cc | 69 + be/src/kudu/util/trace_metrics.h | 105 + be/src/kudu/util/url-coding-test.cc | 107 + be/src/kudu/util/url-coding.cc | 203 ++ be/src/kudu/util/url-coding.h | 70 + be/src/kudu/util/user-test.cc | 42 + be/src/kudu/util/user.cc | 68 + be/src/kudu/util/user.h | 32 + be/src/kudu/util/version_info.cc | 79 + be/src/kudu/util/version_info.h | 48 + be/src/kudu/util/version_info.proto | 32 + be/src/kudu/util/web_callback_registry.h | 68 + be/src/kudu/util/zlib.cc | 122 + be/src/kudu/util/zlib.h | 39 + bin/rat_exclude_files.txt | 23 + 304 files changed, 58970 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/LICENSE.txt ---------------------------------------------------------------------- diff --git a/LICENSE.txt b/LICENSE.txt index 4cda0f2..e5e6611 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -730,3 +730,38 @@ cmake_modules/FindJNI.cmake: 3-clause BSD OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- + +be/src/kudu/util (some portions): 3-clause BSD license + +Some portions of this module are derived from code from LevelDB +( https://github.com/google/leveldb ): + + Copyright (c) 2011 The LevelDB Authors. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + * Neither the name of Google Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +-------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/CMakeLists.txt b/be/src/kudu/util/CMakeLists.txt new file mode 100644 index 0000000..e4fff0c --- /dev/null +++ b/be/src/kudu/util/CMakeLists.txt @@ -0,0 +1,464 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +####################################### +# util_compression_proto +####################################### + +PROTOBUF_GENERATE_CPP( + UTIL_COMPRESSION_PROTO_SRCS UTIL_COMPRESSION_PROTO_HDRS UTIL_COMPRESSION_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES compression/compression.proto) +ADD_EXPORTABLE_LIBRARY(util_compression_proto + SRCS ${UTIL_COMPRESSION_PROTO_SRCS} + DEPS protobuf + NONLINK_DEPS ${UTIL_COMPRESSION_PROTO_TGTS}) + +####################################### +# histogram_proto +####################################### + +PROTOBUF_GENERATE_CPP( + HISTOGRAM_PROTO_SRCS HISTOGRAM_PROTO_HDRS HISTOGRAM_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES histogram.proto) +ADD_EXPORTABLE_LIBRARY(histogram_proto + SRCS ${HISTOGRAM_PROTO_SRCS} + DEPS protobuf + NONLINK_DEPS ${HISTOGRAM_PROTO_TGTS}) + +####################################### +# maintenance_manager_proto +####################################### + +PROTOBUF_GENERATE_CPP( + MAINTENANCE_MANAGER_PROTO_SRCS MAINTENANCE_MANAGER_PROTO_HDRS MAINTENANCE_MANAGER_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES maintenance_manager.proto) +ADD_EXPORTABLE_LIBRARY(maintenance_manager_proto + SRCS ${MAINTENANCE_MANAGER_PROTO_SRCS} + DEPS protobuf + NONLINK_DEPS ${MAINTENANCE_MANAGER_PROTO_TGTS}) + +####################################### +# pb_util_proto +####################################### + +PROTOBUF_GENERATE_CPP( + PB_UTIL_PROTO_SRCS PB_UTIL_PROTO_HDRS PB_UTIL_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES pb_util.proto) +ADD_EXPORTABLE_LIBRARY(pb_util_proto + SRCS ${PB_UTIL_PROTO_SRCS} + DEPS protobuf + NONLINK_DEPS ${PB_UTIL_PROTO_TGTS}) + +####################################### +# version_info_proto +####################################### + +PROTOBUF_GENERATE_CPP( + VERSION_INFO_PROTO_SRCS VERSION_INFO_PROTO_HDRS VERSION_INFO_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES version_info.proto) +ADD_EXPORTABLE_LIBRARY(version_info_proto + SRCS ${VERSION_INFO_PROTO_SRCS} + DEPS protobuf + NONLINK_DEPS ${VERSION_INFO_PROTO_TGTS}) + +############################################################ +# Version stamp +############################################################ + +# Unlike CMAKE_CURRENT_BINARY_DIR, CMAKE_BINARY_DIR is always the root of +# the build directory. +set(VERSION_STAMP_FILE ${CMAKE_BINARY_DIR}/src/kudu/generated/version_defines.h) + +list(APPEND GEN_VERSION_INFO_COMMAND "${BUILD_SUPPORT_DIR}/gen_version_info.py") +list(APPEND GEN_VERSION_INFO_COMMAND "--version=${KUDU_VERSION_NUMBER}") +list(APPEND GEN_VERSION_INFO_COMMAND "--build-type=${CMAKE_BUILD_TYPE}") +if(KUDU_GIT_HASH) + message(STATUS "Provided git hash: ${KUDU_GIT_HASH}") + list(APPEND GEN_VERSION_INFO_COMMAND "--git-hash=${KUDU_GIT_HASH}") +endif() +list(APPEND GEN_VERSION_INFO_COMMAND "${VERSION_STAMP_FILE}") +add_custom_target(gen_version_info + COMMAND ${GEN_VERSION_INFO_COMMAND} + BYPRODUCTS "${VERSION_STAMP_FILE}") + +####################################### +# kudu_util +####################################### + +if (APPLE) + set(SEMAPHORE_CC "semaphore_macosx.cc") +else () + set(SEMAPHORE_CC "semaphore.cc") +endif() + +set(UTIL_SRCS + async_logger.cc + atomic.cc + bitmap.cc + bloom_filter.cc + bitmap.cc + cache.cc + cache_metrics.cc + coding.cc + condition_variable.cc + crc.cc + debug-util.cc + debug/trace_event_impl.cc + debug/trace_event_impl_constants.cc + debug/trace_event_synthetic_delay.cc + env.cc env_posix.cc env_util.cc + errno.cc + faststring.cc + failure_detector.cc + fault_injection.cc + file_cache.cc + flags.cc + flag_tags.cc + flag_validators.cc + group_varint.cc + pstack_watcher.cc + hdr_histogram.cc + hexdump.cc + init.cc + jsonreader.cc + jsonwriter.cc + kernel_stack_watchdog.cc + locks.cc + logging.cc + maintenance_manager.cc + malloc.cc + memcmpable_varint.cc + memory/arena.cc + memory/memory.cc + memory/overwrite.cc + mem_tracker.cc + metrics.cc + minidump.cc + monotime.cc + mutex.cc + net/dns_resolver.cc + net/net_util.cc + net/sockaddr.cc + net/socket.cc + oid_generator.cc + once.cc + os-util.cc + path_util.cc + pb_util.cc + pb_util-internal.cc + process_memory.cc + random_util.cc + resettable_heartbeater.cc + rolling_log.cc + rw_mutex.cc + rwc_lock.cc + ${SEMAPHORE_CC} + signal.cc + slice.cc + spinlock_profiling.cc + status.cc + status_callback.cc + string_case.cc + striped64.cc + subprocess.cc + test_graph.cc + test_util_prod.cc + thread.cc + threadlocal.cc + threadpool.cc + thread_restrictions.cc + throttler.cc + trace.cc + trace_metrics.cc + user.cc + url-coding.cc + version_info.cc + zlib.cc +) + +# overwrite.cc contains a single function which would be a hot spot in +# debug builds. It's separated into a separate file so it can be +# optimized regardless of the default optimization options. +set_source_files_properties(memory/overwrite.cc PROPERTIES COMPILE_FLAGS "-O3") + +if(NOT APPLE) + set(UTIL_SRCS + ${UTIL_SRCS} + nvm_cache.cc) +endif() + +set(UTIL_LIBS + crcutil + gflags + glog + gutil + histogram_proto + libev + maintenance_manager_proto + pb_util_proto + protobuf + version_info_proto + zlib) + +if(NOT APPLE) + set(UTIL_LIBS + ${UTIL_LIBS} + breakpad_client + dl + rt + vmem) +endif() + +# We use MallocExtension, but not in the exported version of the library. +set(EXPORTED_UTIL_LIBS ${UTIL_LIBS}) +if(${KUDU_TCMALLOC_AVAILABLE}) + list(APPEND UTIL_LIBS tcmalloc) +endif() + +ADD_EXPORTABLE_LIBRARY(kudu_util + SRCS ${UTIL_SRCS} + DEPS ${UTIL_LIBS} + NONLINK_DEPS gen_version_info + EXPORTED_DEPS ${EXPORTED_UTIL_LIBS}) + +####################################### +# kudu_util_compression +####################################### +set(UTIL_COMPRESSION_SRCS + compression/compression_codec.cc) +set(UTIL_COMPRESSION_LIBS + kudu_util + util_compression_proto + + glog + gutil + lz4 + snappy + zlib) +ADD_EXPORTABLE_LIBRARY(kudu_util_compression + SRCS ${UTIL_COMPRESSION_SRCS} + DEPS ${UTIL_COMPRESSION_LIBS}) + +####################################### +# kudu_test_util +####################################### + +if(NOT NO_TESTS) + add_library(kudu_test_util + test_util.cc) + target_link_libraries(kudu_test_util + gflags + glog + gmock + kudu_util) + + if(NOT APPLE) + target_link_libraries(kudu_test_util + vmem) + endif() + +####################################### +# kudu_curl_util +####################################### + add_library(kudu_curl_util + curl_util.cc) + target_link_libraries(kudu_curl_util + security + ${CURL_LIBRARIES} + glog + gutil) + +####################################### +# kudu_test_main +####################################### + + add_library(kudu_test_main + test_main.cc) + target_link_libraries(kudu_test_main + ${KRB5_REALM_OVERRIDE} + gflags + glog + gmock + kudu_util + kudu_test_util) + + if(NOT APPLE) + target_link_libraries(kudu_test_main + dl + rt) + endif() +endif() + +####################################### +# protoc-gen-insertions +####################################### + +add_executable(protoc-gen-insertions protoc-gen-insertions.cc) +target_link_libraries(protoc-gen-insertions gutil protobuf protoc ${KUDU_BASE_LIBS}) + +####################################### +# Unit tests +####################################### + +set(KUDU_TEST_LINK_LIBS kudu_util gutil ${KUDU_MIN_TEST_LIBS}) +ADD_KUDU_TEST(atomic-test) +ADD_KUDU_TEST(bit-util-test) +ADD_KUDU_TEST(bitmap-test) +ADD_KUDU_TEST(blocking_queue-test) +ADD_KUDU_TEST(bloom_filter-test) +ADD_KUDU_TEST(cache-test) +ADD_KUDU_TEST(callback_bind-test) +ADD_KUDU_TEST(countdown_latch-test) +ADD_KUDU_TEST(crc-test RUN_SERIAL true) # has a benchmark +ADD_KUDU_TEST(debug-util-test) +ADD_KUDU_TEST(env-test LABELS no_tsan) +ADD_KUDU_TEST(env_util-test) +ADD_KUDU_TEST(errno-test) +ADD_KUDU_TEST(failure_detector-test) +ADD_KUDU_TEST(faststring-test) +ADD_KUDU_TEST(file_cache-test) +ADD_KUDU_TEST(file_cache-stress-test RUN_SERIAL true) +ADD_KUDU_TEST(flag_tags-test) +ADD_KUDU_TEST(flag_validators-test) +ADD_KUDU_TEST(flags-test) +ADD_KUDU_TEST(group_varint-test) +ADD_KUDU_TEST(hash_util-test) +ADD_KUDU_TEST(hdr_histogram-test) +ADD_KUDU_TEST(inline_slice-test) +ADD_KUDU_TEST(interval_tree-test) +ADD_KUDU_TEST(jsonreader-test) +ADD_KUDU_TEST(knapsack_solver-test) +ADD_KUDU_TEST(logging-test) +ADD_KUDU_TEST(maintenance_manager-test) +ADD_KUDU_TEST(map-util-test) +ADD_KUDU_TEST(mem_tracker-test) +ADD_KUDU_TEST(memcmpable_varint-test LABELS no_tsan) +ADD_KUDU_TEST(memory/arena-test) +ADD_KUDU_TEST(metrics-test) +ADD_KUDU_TEST(monotime-test) +ADD_KUDU_TEST(mt-hdr_histogram-test RUN_SERIAL true) +ADD_KUDU_TEST(mt-metrics-test RUN_SERIAL true) +ADD_KUDU_TEST(mt-threadlocal-test RUN_SERIAL true) +ADD_KUDU_TEST(net/dns_resolver-test) +ADD_KUDU_TEST(net/net_util-test) +ADD_KUDU_TEST(object_pool-test) +ADD_KUDU_TEST(oid_generator-test) +ADD_KUDU_TEST(once-test) +ADD_KUDU_TEST(os-util-test) +ADD_KUDU_TEST(path_util-test) +ADD_KUDU_TEST(process_memory-test RUN_SERIAL true) +ADD_KUDU_TEST(random-test) +ADD_KUDU_TEST(random_util-test) +ADD_KUDU_TEST(resettable_heartbeater-test) +ADD_KUDU_TEST(rle-test) +ADD_KUDU_TEST(rolling_log-test) +ADD_KUDU_TEST(rw_mutex-test) +ADD_KUDU_TEST(rw_semaphore-test) +ADD_KUDU_TEST(rwc_lock-test) +ADD_KUDU_TEST(safe_math-test) +ADD_KUDU_TEST(scoped_cleanup-test) +ADD_KUDU_TEST(slice-test) +ADD_KUDU_TEST(spinlock_profiling-test) +ADD_KUDU_TEST(stack_watchdog-test) +ADD_KUDU_TEST(status-test) +ADD_KUDU_TEST(string_case-test) +ADD_KUDU_TEST(striped64-test) +ADD_KUDU_TEST(subprocess-test) +ADD_KUDU_TEST(thread-test) +ADD_KUDU_TEST(threadpool-test) +ADD_KUDU_TEST(throttler-test) +ADD_KUDU_TEST(trace-test) +ADD_KUDU_TEST(url-coding-test) +ADD_KUDU_TEST(user-test) + +if (NOT APPLE) + ADD_KUDU_TEST(minidump-test) + ADD_KUDU_TEST(pstack_watcher-test) +endif() + +####################################### +# jsonwriter_test_proto +####################################### + +PROTOBUF_GENERATE_CPP( + JSONWRITER_TEST_PROTO_SRCS JSONWRITER_TEST_PROTO_HDRS JSONWRITER_TEST_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES jsonwriter_test.proto) +add_library(jsonwriter_test_proto ${JSONWRITER_TEST_PROTO_SRCS} ${JSONWRITER_TEST_PROTO_HDRS}) +target_link_libraries(jsonwriter_test_proto + pb_util_proto + protobuf) + +####################################### +# jsonwriter-test +####################################### + +ADD_KUDU_TEST(jsonwriter-test) +if(NOT NO_TESTS) + target_link_libraries(jsonwriter-test + jsonwriter_test_proto) +endif() + +####################################### +# pb_util_test_proto +####################################### + +PROTOBUF_GENERATE_CPP( + PROTO_CONTAINER_TEST_PROTO_SRCS PROTO_CONTAINER_TEST_PROTO_HDRS PROTO_CONTAINER_TEST_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES + proto_container_test.proto + proto_container_test2.proto + proto_container_test3.proto + pb_util_test.proto) +add_library(pb_util_test_proto + ${PROTO_CONTAINER_TEST_PROTO_SRCS} + ${PROTO_CONTAINER_TEST_PROTO_HDRS}) +target_link_libraries(pb_util_test_proto + pb_util_proto + protobuf) + +####################################### +# pb_util-test +####################################### + +ADD_KUDU_TEST(pb_util-test) +if(NOT NO_TESTS) + target_link_libraries(pb_util-test + pb_util_test_proto) +endif() + +####################################### +# util/compression tests +####################################### +ADD_KUDU_TEST(compression/compression-test) +if(NOT NO_TESTS) + target_link_libraries(compression-test + kudu_util_compression) +endif() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/alignment.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/alignment.h b/be/src/kudu/util/alignment.h new file mode 100644 index 0000000..f475373 --- /dev/null +++ b/be/src/kudu/util/alignment.h @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Macros for dealing with memory alignment. +#ifndef KUDU_UTIL_ALIGNMENT_H +#define KUDU_UTIL_ALIGNMENT_H + +// Round down 'x' to the nearest 'align' boundary +#define KUDU_ALIGN_DOWN(x, align) ((x) & (-(align))) + +// Round up 'x' to the nearest 'align' boundary +#define KUDU_ALIGN_UP(x, align) (((x) + ((align) - 1)) & (-(align))) + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/async_logger.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/async_logger.cc b/be/src/kudu/util/async_logger.cc new file mode 100644 index 0000000..8e6a4a9 --- /dev/null +++ b/be/src/kudu/util/async_logger.cc @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/async_logger.h" + +#include <algorithm> +#include <string> +#include <thread> + +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" + +using std::string; + +namespace kudu { + +AsyncLogger::AsyncLogger(google::base::Logger* wrapped, + int max_buffer_bytes) : + max_buffer_bytes_(max_buffer_bytes), + wrapped_(DCHECK_NOTNULL(wrapped)), + wake_flusher_cond_(&lock_), + free_buffer_cond_(&lock_), + flush_complete_cond_(&lock_), + active_buf_(new Buffer()), + flushing_buf_(new Buffer()) { + DCHECK_GT(max_buffer_bytes_, 0); +} + +AsyncLogger::~AsyncLogger() {} + +void AsyncLogger::Start() { + CHECK_EQ(state_, INITTED); + state_ = RUNNING; + thread_ = std::thread(&AsyncLogger::RunThread, this); +} + +void AsyncLogger::Stop() { + { + MutexLock l(lock_); + CHECK_EQ(state_, RUNNING); + state_ = STOPPED; + wake_flusher_cond_.Signal(); + } + thread_.join(); + CHECK(active_buf_->messages.empty()); + CHECK(flushing_buf_->messages.empty()); +} + +void AsyncLogger::Write(bool force_flush, + time_t timestamp, + const char* message, + int message_len) { + { + MutexLock l(lock_); + DCHECK_EQ(state_, RUNNING); + while (BufferFull(*active_buf_)) { + app_threads_blocked_count_for_tests_++; + free_buffer_cond_.Wait(); + } + active_buf_->add(Msg(timestamp, string(message, message_len)), + force_flush); + wake_flusher_cond_.Signal(); + } + + // In most cases, we take the 'force_flush' argument to mean that we'll let the logger + // thread do the flushing for us, but not block the application. However, for the + // special case of a FATAL log message, we really want to make sure that our message + // hits the log before we continue, or else it's likely that the application will exit + // while it's still in our buffer. + // + // NOTE: even if the application doesn't wrap the FATAL-level logger, log messages at + // FATAL are also written to all other log files with lower levels. So, a FATAL message + // will force a synchronous flush of all lower-level logs before exiting. + // + // Unfortunately, the underlying log level isn't passed through to this interface, so we + // have to use this hack: messages from FATAL errors start with the character 'F'. + if (message_len > 0 && message[0] == 'F') { + Flush(); + } +} + +void AsyncLogger::Flush() { + MutexLock l(lock_); + DCHECK_EQ(state_, RUNNING); + + // Wake up the writer thread at least twice. + // This ensures that it has completely flushed both buffers. + uint64_t orig_flush_count = flush_count_; + while (flush_count_ < orig_flush_count + 2 && + state_ == RUNNING) { + active_buf_->flush = true; + wake_flusher_cond_.Signal(); + flush_complete_cond_.Wait(); + } +} + +uint32 AsyncLogger::LogSize() { + return wrapped_->LogSize(); +} + +void AsyncLogger::RunThread() { + MutexLock l(lock_); + while (state_ == RUNNING || active_buf_->needs_flush_or_write()) { + while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) { + if (!wake_flusher_cond_.TimedWait(MonoDelta::FromSeconds(FLAGS_logbufsecs))) { + // In case of wait timeout, force it to flush regardless whether there is anything enqueued. + active_buf_->flush = true; + } + } + + active_buf_.swap(flushing_buf_); + // If the buffer that we are about to flush was full, then + // we may have other threads which were blocked that we now + // need to wake up. + if (BufferFull(*flushing_buf_)) { + free_buffer_cond_.Broadcast(); + } + l.Unlock(); + + for (const auto& msg : flushing_buf_->messages) { + wrapped_->Write(false, msg.ts, msg.message.data(), msg.message.size()); + } + if (flushing_buf_->flush) { + wrapped_->Flush(); + } + flushing_buf_->clear(); + + l.Lock(); + flush_count_++; + flush_complete_cond_.Broadcast(); + } +} + +bool AsyncLogger::BufferFull(const Buffer& buf) const { + // We evenly divide our total buffer space between the two buffers. + return buf.size > (max_buffer_bytes_ / 2); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/async_logger.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/async_logger.h b/be/src/kudu/util/async_logger.h new file mode 100644 index 0000000..2804a9b --- /dev/null +++ b/be/src/kudu/util/async_logger.h @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/gutil/macros.h" + +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/util/locks.h" + +namespace kudu { + +// Wrapper for a glog Logger which asynchronously writes log messages. +// This class starts a new thread responsible for forwarding the messages +// to the logger, and performs double buffering. Writers append to the +// current buffer and then wake up the logger thread. The logger swaps in +// a new buffer and writes any accumulated messages to the wrapped +// Logger. +// +// This double-buffering behavior dramatically improves performance, especially +// for logging messages which require flushing the underlying file (i.e WARNING +// and above for default). The flush can take a couple of milliseconds, and in +// some cases can even block for hundreds of milliseconds or more. With the +// double-buffered approach, threads can proceed with useful work while the IO +// thread blocks. +// +// The semantics provided by this wrapper are slightly weaker than the default +// glog semantics. By default, glog will immediately (synchronously) flush WARNING +// and above to the underlying file, whereas here we are deferring that flush to +// the separate thread. This means that a crash just after a 'LOG(WARNING)' would +// may be missing the message in the logs, but the perf benefit is probably +// worth it. We do take care that a glog FATAL message flushes all buffered log +// messages before exiting. +// +// NOTE: the logger limits the total amount of buffer space, so if the underlying +// log blocks for too long, eventually the threads generating the log messages +// will block as well. This prevents runaway memory usage. +class AsyncLogger : public google::base::Logger { + public: + AsyncLogger(google::base::Logger* wrapped, + int max_buffer_bytes); + ~AsyncLogger(); + + void Start(); + + // Stop the thread. Flush() and Write() must not be called after this. + // + // NOTE: this is currently only used in tests: in real life, we enable async + // logging once when the program starts and then never disable it. + // + // REQUIRES: Start() must have been called. + void Stop(); + + // Write a message to the log. + // + // 'force_flush' is set by the GLog library based on the configured '--logbuflevel' + // flag. Any messages logged at the configured level or higher result in 'force_flush' + // being set to true, indicating that the message should be immediately written to the + // log rather than buffered in memory. See the class-level docs above for more detail + // about the implementation provided here. + // + // REQUIRES: Start() must have been called. + void Write(bool force_flush, + time_t timestamp, + const char* message, + int message_len) override; + + // Flush any buffered messages. + void Flush() override; + + // Get the current LOG file size. + // The returned value is approximate since some + // logged data may not have been flushed to disk yet. + uint32 LogSize() override; + + // Return a count of how many times an application thread was + // blocked due to the buffers being full and the writer thread + // not keeping up. + int app_threads_blocked_count_for_tests() const { + MutexLock l(lock_); + return app_threads_blocked_count_for_tests_; + } + + private: + // A buffered message. + // + // TODO(todd): using std::string for buffered messages is convenient but not + // as efficient as it could be. Better would be to make the buffers just be + // Arenas and allocate both the message data and Msg struct from them, forming + // a linked list. + struct Msg { + time_t ts; + std::string message; + + Msg(time_t ts, std::string message) + : ts(ts), + message(std::move(message)) { + } + }; + + // A buffer of messages waiting to be flushed. + struct Buffer { + std::vector<Msg> messages; + + // Estimate of the size of 'messages'. + int size = 0; + + // Whether this buffer needs an explicit flush of the + // underlying logger. + bool flush = false; + + Buffer() {} + + void clear() { + messages.clear(); + size = 0; + flush = false; + } + + void add(Msg msg, bool flush) { + size += sizeof(msg) + msg.message.size(); + messages.emplace_back(std::move(msg)); + this->flush |= flush; + } + + bool needs_flush_or_write() const { + return flush || !messages.empty(); + } + + private: + DISALLOW_COPY_AND_ASSIGN(Buffer); + }; + + bool BufferFull(const Buffer& buf) const; + void RunThread(); + + // The maximum number of bytes used by the entire class. + const int max_buffer_bytes_; + google::base::Logger* const wrapped_; + std::thread thread_; + + // Count of how many times an application thread was blocked due to + // a full buffer. + int app_threads_blocked_count_for_tests_ = 0; + + // Count of how many times the writer thread has flushed the buffers. + // 64 bits should be enough to never worry about overflow. + uint64_t flush_count_ = 0; + + // Protects buffers as well as 'state_'. + mutable Mutex lock_; + + // Signaled by app threads to wake up the flusher, either for new + // data or because 'state_' changed. + ConditionVariable wake_flusher_cond_; + + // Signaled by the flusher thread when the flusher has swapped in + // a free buffer to write to. + ConditionVariable free_buffer_cond_; + + // Signaled by the flusher thread when it has completed flushing + // the current buffer. + ConditionVariable flush_complete_cond_; + + // The buffer to which application threads append new log messages. + std::unique_ptr<Buffer> active_buf_; + + // The buffer currently being flushed by the logger thread, cleared + // after a successful flush. + std::unique_ptr<Buffer> flushing_buf_; + + // Trigger for the logger thread to stop. + enum State { + INITTED, + RUNNING, + STOPPED + }; + State state_ = INITTED; + + DISALLOW_COPY_AND_ASSIGN(AsyncLogger); +}; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/async_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/async_util.h b/be/src/kudu/util/async_util.h new file mode 100644 index 0000000..1e2830c --- /dev/null +++ b/be/src/kudu/util/async_util.h @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Utility functions which are handy when doing async/callback-based programming. +#ifndef KUDU_UTIL_ASYNC_UTIL_H +#define KUDU_UTIL_ASYNC_UTIL_H + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/macros.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/status.h" +#include "kudu/util/status_callback.h" + +namespace kudu { + +// Simple class which can be used to make async methods synchronous. +// For example: +// Synchronizer s; +// SomeAsyncMethod(s.callback()); +// CHECK_OK(s.Wait()); +class Synchronizer { + public: + Synchronizer() + : l(1) { + } + void StatusCB(const Status& status) { + s = status; + l.CountDown(); + } + StatusCallback AsStatusCallback() { + // Synchronizers are often declared on the stack, so it doesn't make + // sense for a callback to take a reference to its synchronizer. + // + // Note: this means the returned callback _must_ go out of scope before + // its synchronizer. + return Bind(&Synchronizer::StatusCB, Unretained(this)); + } + Status Wait() { + l.Wait(); + return s; + } + Status WaitFor(const MonoDelta& delta) { + if (PREDICT_FALSE(!l.WaitFor(delta))) { + return Status::TimedOut("Timed out while waiting for the callback to be called."); + } + return s; + } + void Reset() { + l.Reset(1); + } + private: + DISALLOW_COPY_AND_ASSIGN(Synchronizer); + Status s; + CountDownLatch l; +}; + +} // namespace kudu +#endif /* KUDU_UTIL_ASYNC_UTIL_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/atomic-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/atomic-test.cc b/be/src/kudu/util/atomic-test.cc new file mode 100644 index 0000000..9834d09 --- /dev/null +++ b/be/src/kudu/util/atomic-test.cc @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/atomic.h" + +#include <limits> +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/util/test_util.h" + +namespace kudu { + +using std::numeric_limits; +using std::vector; + +// TODO Add some multi-threaded tests; currently AtomicInt is just a +// wrapper around 'atomicops.h', but should the underlying +// implemention change, it would help to have tests that make sure +// invariants are preserved in a multi-threaded environment. + +template<typename T> +class AtomicIntTest : public KuduTest { + public: + + AtomicIntTest() + : max_(numeric_limits<T>::max()), + min_(numeric_limits<T>::min()) { + acquire_release_ = { kMemOrderNoBarrier, kMemOrderAcquire, kMemOrderRelease }; + barrier_ = { kMemOrderNoBarrier, kMemOrderBarrier }; + } + + vector<MemoryOrder> acquire_release_; + vector<MemoryOrder> barrier_; + + T max_; + T min_; +}; + +typedef ::testing::Types<int32_t, int64_t, uint32_t, uint64_t> IntTypes; +TYPED_TEST_CASE(AtomicIntTest, IntTypes); + +TYPED_TEST(AtomicIntTest, LoadStore) { + for (const MemoryOrder mem_order : this->acquire_release_) { + AtomicInt<TypeParam> i(0); + EXPECT_EQ(0, i.Load(mem_order)); + i.Store(42, mem_order); + EXPECT_EQ(42, i.Load(mem_order)); + i.Store(this->min_, mem_order); + EXPECT_EQ(this->min_, i.Load(mem_order)); + i.Store(this->max_, mem_order); + EXPECT_EQ(this->max_, i.Load(mem_order)); + } +} + +TYPED_TEST(AtomicIntTest, SetSwapExchange) { + for (const MemoryOrder mem_order : this->acquire_release_) { + AtomicInt<TypeParam> i(0); + EXPECT_TRUE(i.CompareAndSet(0, 5, mem_order)); + EXPECT_EQ(5, i.Load(mem_order)); + EXPECT_FALSE(i.CompareAndSet(0, 10, mem_order)); + + EXPECT_EQ(5, i.CompareAndSwap(5, this->max_, mem_order)); + EXPECT_EQ(this->max_, i.CompareAndSwap(42, 42, mem_order)); + EXPECT_EQ(this->max_, i.CompareAndSwap(this->max_, this->min_, mem_order)); + + EXPECT_EQ(this->min_, i.Exchange(this->max_, mem_order)); + EXPECT_EQ(this->max_, i.Load(mem_order)); + } +} + +TYPED_TEST(AtomicIntTest, MinMax) { + for (const MemoryOrder mem_order : this->acquire_release_) { + AtomicInt<TypeParam> i(0); + + i.StoreMax(100, mem_order); + EXPECT_EQ(100, i.Load(mem_order)); + i.StoreMin(50, mem_order); + EXPECT_EQ(50, i.Load(mem_order)); + + i.StoreMax(25, mem_order); + EXPECT_EQ(50, i.Load(mem_order)); + i.StoreMin(75, mem_order); + EXPECT_EQ(50, i.Load(mem_order)); + + i.StoreMax(this->max_, mem_order); + EXPECT_EQ(this->max_, i.Load(mem_order)); + i.StoreMin(this->min_, mem_order); + EXPECT_EQ(this->min_, i.Load(mem_order)); + } +} + +TYPED_TEST(AtomicIntTest, Increment) { + for (const MemoryOrder mem_order : this->barrier_) { + AtomicInt<TypeParam> i(0); + EXPECT_EQ(1, i.Increment(mem_order)); + EXPECT_EQ(3, i.IncrementBy(2, mem_order)); + EXPECT_EQ(3, i.IncrementBy(0, mem_order)); + } +} + +TEST(Atomic, AtomicBool) { + vector<MemoryOrder> memory_orders = { kMemOrderNoBarrier, kMemOrderRelease, kMemOrderAcquire }; + for (const MemoryOrder mem_order : memory_orders) { + AtomicBool b(false); + EXPECT_FALSE(b.Load(mem_order)); + b.Store(true, mem_order); + EXPECT_TRUE(b.Load(mem_order)); + EXPECT_TRUE(b.CompareAndSet(true, false, mem_order)); + EXPECT_FALSE(b.Load(mem_order)); + EXPECT_FALSE(b.CompareAndSet(true, false, mem_order)); + EXPECT_FALSE(b.CompareAndSwap(false, true, mem_order)); + EXPECT_TRUE(b.Load(mem_order)); + EXPECT_TRUE(b.Exchange(false, mem_order)); + EXPECT_FALSE(b.Load(mem_order)); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/atomic.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/atomic.cc b/be/src/kudu/util/atomic.cc new file mode 100644 index 0000000..1fdae39 --- /dev/null +++ b/be/src/kudu/util/atomic.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/atomic.h" + +#include <stdint.h> + +#include <glog/logging.h> + +namespace kudu { + +template<typename T> +AtomicInt<T>::AtomicInt(T initial_value) { + Store(initial_value, kMemOrderNoBarrier); +} + +template<typename T> +void AtomicInt<T>::FatalMemOrderNotSupported(const char* caller, + const char* requested, + const char* supported) { + LOG(FATAL) << caller << " does not support " << requested << ": only " + << supported << " are supported."; +} + +template +class AtomicInt<int32_t>; + +template +class AtomicInt<int64_t>; + +template +class AtomicInt<uint32_t>; + +template +class AtomicInt<uint64_t>; + +AtomicBool::AtomicBool(bool value) + : underlying_(value) { +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/atomic.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/atomic.h b/be/src/kudu/util/atomic.h new file mode 100644 index 0000000..da9b677 --- /dev/null +++ b/be/src/kudu/util/atomic.h @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_ATOMIC_H +#define KUDU_UTIL_ATOMIC_H + +#include <algorithm> +#include <type_traits> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" + +namespace kudu { + +// See top-level comments in kudu/gutil/atomicops.h for further +// explanations of these levels. +enum MemoryOrder { + // Relaxed memory ordering, doesn't use any barriers. + kMemOrderNoBarrier = 0, + + // Ensures that no later memory access by the same thread can be + // reordered ahead of the operation. + kMemOrderAcquire = 1, + + // Ensures that no previous memory access by the same thread can be + // reordered after the operation. + kMemOrderRelease = 2, + + // Ensures that neither previous NOR later memory access by the same + // thread can be reordered after the operation. + kMemOrderBarrier = 3, +}; + +// Atomic integer class inspired by Impala's AtomicInt and +// std::atomic<> in C++11. +// +// NOTE: All of public operations use an implicit memory order of +// kMemOrderNoBarrier unless otherwise specified. +// +// Unlike std::atomic<>, overflowing an unsigned AtomicInt via Increment or +// IncrementBy is undefined behavior (it is also undefined for signed types, +// as always). +// +// See also: kudu/gutil/atomicops.h +template<typename T> +class AtomicInt { + public: + // Initialize the underlying value to 'initial_value'. The + // initialization performs a Store with 'kMemOrderNoBarrier'. + explicit AtomicInt(T initial_value); + + // Returns the underlying value. + // + // Does not support 'kMemOrderBarrier'. + T Load(MemoryOrder mem_order = kMemOrderNoBarrier) const; + + // Sets the underlying value to 'new_value'. + // + // Does not support 'kMemOrderBarrier'. + void Store(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier); + + // Iff the underlying value is equal to 'expected_val', sets the + // underlying value to 'new_value' and returns true; returns false + // otherwise. + // + // Does not support 'kMemOrderBarrier'. + bool CompareAndSet(T expected_val, T new_value, MemoryOrder mem_order = kMemOrderNoBarrier); + + // Iff the underlying value is equal to 'expected_val', sets the + // underlying value to 'new_value' and returns + // 'expected_val'. Otherwise, returns the current underlying + // value. + // + // Does not support 'kMemOrderBarrier'. + T CompareAndSwap(T expected_val, T new_value, MemoryOrder mem_order = kMemOrderNoBarrier); + + // Sets the underlying value to 'new_value' iff 'new_value' is + // greater than the current underlying value. + // + // Does not support 'kMemOrderBarrier'. + void StoreMax(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier); + + // Sets the underlying value to 'new_value' iff 'new_value' is less + // than the current underlying value. + // + // Does not support 'kMemOrderBarrier'. + void StoreMin(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier); + + // Increments the underlying value by 1 and returns the new + // underlying value. + // + // Does not support 'kMemOrderAcquire' or 'kMemOrderRelease'. + T Increment(MemoryOrder mem_order = kMemOrderNoBarrier); + + // Increments the underlying value by 'delta' and returns the new + // underlying value. + + // Does not support 'kKemOrderAcquire' or 'kMemOrderRelease'. + T IncrementBy(T delta, MemoryOrder mem_order = kMemOrderNoBarrier); + + // Sets the underlying value to 'new_value' and returns the previous + // underlying value. + // + // Does not support 'kMemOrderBarrier'. + T Exchange(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier); + + private: + // If a method 'caller' doesn't support memory order described as + // 'requested', exit by doing perform LOG(FATAL) logging the method + // called, the requested memory order, and the supported memory + // orders. + static void FatalMemOrderNotSupported(const char* caller, + const char* requested = "kMemOrderBarrier", + const char* supported = + "kMemNorderNoBarrier, kMemOrderAcquire, kMemOrderRelease"); + + // The gutil/atomicops.h functions only operate on signed types. + // So, even if the user specializes on an unsigned type, we use a + // signed type internally. + typedef typename std::make_signed<T>::type SignedT; + SignedT value_; + + DISALLOW_COPY_AND_ASSIGN(AtomicInt); +}; + +// Adapts AtomicInt to handle boolean values. +// +// NOTE: All of public operations use an implicit memory order of +// kMemOrderNoBarrier unless otherwise specified. +// +// See AtomicInt above for documentation on individual methods. +class AtomicBool { + public: + explicit AtomicBool(bool value); + + bool Load(MemoryOrder m = kMemOrderNoBarrier) const { + return underlying_.Load(m); + } + void Store(bool n, MemoryOrder m = kMemOrderNoBarrier) { + underlying_.Store(static_cast<int32_t>(n), m); + } + bool CompareAndSet(bool e, bool n, MemoryOrder m = kMemOrderNoBarrier) { + return underlying_.CompareAndSet(static_cast<int32_t>(e), static_cast<int32_t>(n), m); + } + bool CompareAndSwap(bool e, bool n, MemoryOrder m = kMemOrderNoBarrier) { + return underlying_.CompareAndSwap(static_cast<int32_t>(e), static_cast<int32_t>(n), m); + } + bool Exchange(bool n, MemoryOrder m = kMemOrderNoBarrier) { + return underlying_.Exchange(static_cast<int32_t>(n), m); + } + private: + AtomicInt<int32_t> underlying_; + + DISALLOW_COPY_AND_ASSIGN(AtomicBool); +}; + +template<typename T> +inline T AtomicInt<T>::Load(MemoryOrder mem_order) const { + switch (mem_order) { + case kMemOrderNoBarrier: { + return base::subtle::NoBarrier_Load(&value_); + } + case kMemOrderBarrier: { + FatalMemOrderNotSupported("Load"); + break; + } + case kMemOrderAcquire: { + return base::subtle::Acquire_Load(&value_); + } + case kMemOrderRelease: { + return base::subtle::Release_Load(&value_); + } + } + abort(); // Unnecessary, but avoids gcc complaining. +} + +template<typename T> +inline void AtomicInt<T>::Store(T new_value, MemoryOrder mem_order) { + switch (mem_order) { + case kMemOrderNoBarrier: { + base::subtle::NoBarrier_Store(&value_, new_value); + break; + } + case kMemOrderBarrier: { + FatalMemOrderNotSupported("Store"); + break; + } + case kMemOrderAcquire: { + base::subtle::Acquire_Store(&value_, new_value); + break; + } + case kMemOrderRelease: { + base::subtle::Release_Store(&value_, new_value); + break; + } + } +} + +template<typename T> +inline bool AtomicInt<T>::CompareAndSet(T expected_val, T new_val, MemoryOrder mem_order) { + return CompareAndSwap(expected_val, new_val, mem_order) == expected_val; +} + +template<typename T> +inline T AtomicInt<T>::CompareAndSwap(T expected_val, T new_val, MemoryOrder mem_order) { + switch (mem_order) { + case kMemOrderNoBarrier: { + return base::subtle::NoBarrier_CompareAndSwap( + &value_, expected_val, new_val); + } + case kMemOrderBarrier: { + FatalMemOrderNotSupported("CompareAndSwap/CompareAndSet"); + break; + } + case kMemOrderAcquire: { + return base::subtle::Acquire_CompareAndSwap( + &value_, expected_val, new_val); + } + case kMemOrderRelease: { + return base::subtle::Release_CompareAndSwap( + &value_, expected_val, new_val); + } + } + abort(); +} + + +template<typename T> +inline T AtomicInt<T>::Increment(MemoryOrder mem_order) { + return IncrementBy(1, mem_order); +} + +template<typename T> +inline T AtomicInt<T>::IncrementBy(T delta, MemoryOrder mem_order) { + switch (mem_order) { + case kMemOrderNoBarrier: { + return base::subtle::NoBarrier_AtomicIncrement(&value_, delta); + } + case kMemOrderBarrier: { + return base::subtle::Barrier_AtomicIncrement(&value_, delta); + } + case kMemOrderAcquire: { + FatalMemOrderNotSupported("Increment/IncrementBy", + "kMemOrderAcquire", + "kMemOrderNoBarrier and kMemOrderBarrier"); + break; + } + case kMemOrderRelease: { + FatalMemOrderNotSupported("Increment/Incrementby", + "kMemOrderAcquire", + "kMemOrderNoBarrier and kMemOrderBarrier"); + break; + } + } + abort(); +} + +template<typename T> +inline T AtomicInt<T>::Exchange(T new_value, MemoryOrder mem_order) { + switch (mem_order) { + case kMemOrderNoBarrier: { + return base::subtle::NoBarrier_AtomicExchange(&value_, new_value); + } + case kMemOrderBarrier: { + FatalMemOrderNotSupported("Exchange"); + break; + } + case kMemOrderAcquire: { + return base::subtle::Acquire_AtomicExchange(&value_, new_value); + } + case kMemOrderRelease: { + return base::subtle::Release_AtomicExchange(&value_, new_value); + } + } + abort(); +} + +template<typename T> +inline void AtomicInt<T>::StoreMax(T new_value, MemoryOrder mem_order) { + T old_value = Load(mem_order); + while (true) { + T max_value = std::max(old_value, new_value); + T prev_value = CompareAndSwap(old_value, max_value, mem_order); + if (PREDICT_TRUE(old_value == prev_value)) { + break; + } + old_value = prev_value; + } +} + +template<typename T> +inline void AtomicInt<T>::StoreMin(T new_value, MemoryOrder mem_order) { + T old_value = Load(mem_order); + while (true) { + T min_value = std::min(old_value, new_value); + T prev_value = CompareAndSwap(old_value, min_value, mem_order); + if (PREDICT_TRUE(old_value == prev_value)) { + break; + } + old_value = prev_value; + } +} + +} // namespace kudu +#endif /* KUDU_UTIL_ATOMIC_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/auto_release_pool.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/auto_release_pool.h b/be/src/kudu/util/auto_release_pool.h new file mode 100644 index 0000000..eaed9c2 --- /dev/null +++ b/be/src/kudu/util/auto_release_pool.h @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Simple pool of objects that will be deallocated when the pool is +// destroyed + +#ifndef KUDU_UTIL_AUTO_RELEASE_POOL_H +#define KUDU_UTIL_AUTO_RELEASE_POOL_H + +#include <vector> + +#include "kudu/gutil/spinlock.h" + +namespace kudu { + +// Thread-safe. +class AutoReleasePool { + public: + AutoReleasePool(): objects_() { } + + ~AutoReleasePool() { + for (auto& object : objects_) { + delete object; + } + } + + template <class T> + T *Add(T *t) { + base::SpinLockHolder l(&lock_); + objects_.push_back(new SpecificElement<T>(t)); + return t; + } + + // Add an array-allocated object to the pool. This is identical to + // Add() except that it will be freed with 'delete[]' instead of 'delete'. + template<class T> + T* AddArray(T *t) { + base::SpinLockHolder l(&lock_); + objects_.push_back(new SpecificArrayElement<T>(t)); + return t; + } + + // Donate all objects in this pool to another pool. + void DonateAllTo(AutoReleasePool* dst) { + base::SpinLockHolder l(&lock_); + base::SpinLockHolder l_them(&dst->lock_); + + dst->objects_.reserve(dst->objects_.size() + objects_.size()); + dst->objects_.insert(dst->objects_.end(), objects_.begin(), objects_.end()); + objects_.clear(); + } + + private: + struct GenericElement { + virtual ~GenericElement() {} + }; + + template <class T> + struct SpecificElement : GenericElement { + explicit SpecificElement(T *t): t(t) {} + ~SpecificElement() { + delete t; + } + + T *t; + }; + + template <class T> + struct SpecificArrayElement : GenericElement { + explicit SpecificArrayElement(T *t): t(t) {} + ~SpecificArrayElement() { + delete [] t; + } + + T *t; + }; + + typedef std::vector<GenericElement *> ElementVector; + ElementVector objects_; + base::SpinLock lock_; +}; + + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/barrier.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/barrier.h b/be/src/kudu/util/barrier.h new file mode 100644 index 0000000..88e5682 --- /dev/null +++ b/be/src/kudu/util/barrier.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/gutil/macros.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/mutex.h" +#include "kudu/util/thread_restrictions.h" + +namespace kudu { + +// Implementation of pthread-style Barriers. +class Barrier { + public: + // Initialize the barrier with the given initial count. + explicit Barrier(int count) : + cond_(&mutex_), + count_(count), + initial_count_(count) { + DCHECK_GT(count, 0); + } + + ~Barrier() { + } + + // Wait until all threads have reached the barrier. + // Once all threads have reached the barrier, the barrier is reset + // to the initial count. + void Wait() { + ThreadRestrictions::AssertWaitAllowed(); + MutexLock l(mutex_); + if (--count_ == 0) { + count_ = initial_count_; + cycle_count_++; + cond_.Broadcast(); + return; + } + + int initial_cycle = cycle_count_; + while (cycle_count_ == initial_cycle) { + cond_.Wait(); + } + } + + private: + Mutex mutex_; + ConditionVariable cond_; + int count_; + uint32_t cycle_count_ = 0; + const int initial_count_; + DISALLOW_COPY_AND_ASSIGN(Barrier); +}; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bit-stream-utils.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bit-stream-utils.h b/be/src/kudu/util/bit-stream-utils.h new file mode 100644 index 0000000..c6aeb01 --- /dev/null +++ b/be/src/kudu/util/bit-stream-utils.h @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef IMPALA_UTIL_BIT_STREAM_UTILS_H +#define IMPALA_UTIL_BIT_STREAM_UTILS_H + +#include "kudu/gutil/port.h" +#include "kudu/util/bit-util.h" +#include "kudu/util/faststring.h" + +namespace kudu { + +// Utility class to write bit/byte streams. This class can write data to either be +// bit packed or byte aligned (and a single stream that has a mix of both). +class BitWriter { + public: + // buffer: buffer to write bits to. + explicit BitWriter(faststring *buffer) + : buffer_(buffer) { + Clear(); + } + + void Clear() { + buffered_values_ = 0; + byte_offset_ = 0; + bit_offset_ = 0; + buffer_->clear(); + } + + // Returns a pointer to the underlying buffer + faststring *buffer() const { return buffer_; } + + // The number of current bytes written, including the current byte (i.e. may include a + // fraction of a byte). Includes buffered values. + int bytes_written() const { return byte_offset_ + BitUtil::Ceil(bit_offset_, 8); } + + // Writes a value to buffered_values_, flushing to buffer_ if necessary. This is bit + // packed. num_bits must be <= 32. If 'v' is larger than 'num_bits' bits, the higher + // bits are ignored. + void PutValue(uint64_t v, int num_bits); + + // Writes v to the next aligned byte using num_bits. If T is larger than num_bits, the + // extra high-order bits will be ignored. + template<typename T> + void PutAligned(T v, int num_bits); + + // Write a Vlq encoded int to the buffer. The value is written byte aligned. + // For more details on vlq: en.wikipedia.org/wiki/Variable-length_quantity + void PutVlqInt(int32_t v); + + // Get the index to the next aligned byte and advance the underlying buffer by num_bytes. + size_t GetByteIndexAndAdvance(int num_bytes) { + uint8_t* ptr = GetNextBytePtr(num_bytes); + return ptr - buffer_->data(); + } + + // Get a pointer to the next aligned byte and advance the underlying buffer by num_bytes. + uint8_t* GetNextBytePtr(int num_bytes); + + // Flushes all buffered values to the buffer. Call this when done writing to the buffer. + // If 'align' is true, buffered_values_ is reset and any future writes will be written + // to the next byte boundary. + void Flush(bool align = false); + + private: + // Bit-packed values are initially written to this variable before being memcpy'd to + // buffer_. This is faster than writing values byte by byte directly to buffer_. + uint64_t buffered_values_; + + faststring *buffer_; + int byte_offset_; // Offset in buffer_ + int bit_offset_; // Offset in buffered_values_ +}; + +// Utility class to read bit/byte stream. This class can read bits or bytes +// that are either byte aligned or not. It also has utilities to read multiple +// bytes in one read (e.g. encoded int). +class BitReader { + public: + // 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'. + BitReader(const uint8_t* buffer, int buffer_len); + + BitReader() : buffer_(NULL), max_bytes_(0) {} + + // Gets the next value from the buffer. Returns true if 'v' could be read or false if + // there are not enough bytes left. num_bits must be <= 32. + template<typename T> + bool GetValue(int num_bits, T* v); + + // Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a + // little-endian native type and big enough to store 'num_bytes'. The value is assumed + // to be byte-aligned so the stream will be advanced to the start of the next byte + // before 'v' is read. Returns false if there are not enough bytes left. + template<typename T> + bool GetAligned(int num_bytes, T* v); + + // Reads a vlq encoded int from the stream. The encoded int must start at the + // beginning of a byte. Return false if there were not enough bytes in the buffer. + bool GetVlqInt(int32_t* v); + + // Returns the number of bytes left in the stream, not including the current byte (i.e., + // there may be an additional fraction of a byte). + int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); } + + // Current position in the stream, by bit. + int position() const { return byte_offset_ * 8 + bit_offset_; } + + // Rewind the stream by 'num_bits' bits + void Rewind(int num_bits); + + // Seek to a specific bit in the buffer + void SeekToBit(uint stream_position); + + // Maximum byte length of a vlq encoded int + static const int MAX_VLQ_BYTE_LEN = 5; + + bool is_initialized() const { return buffer_ != NULL; } + + private: + // Used by SeekToBit() and GetValue() to fetch the + // the next word into buffer_. + void BufferValues(); + + const uint8_t* buffer_; + int max_bytes_; + + // Bytes are memcpy'd from buffer_ and values are read from this variable. This is + // faster than reading values byte by byte directly from buffer_. + uint64_t buffered_values_; + + int byte_offset_; // Offset in buffer_ + int bit_offset_; // Offset in buffered_values_ +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bit-stream-utils.inline.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bit-stream-utils.inline.h b/be/src/kudu/util/bit-stream-utils.inline.h new file mode 100644 index 0000000..569197b --- /dev/null +++ b/be/src/kudu/util/bit-stream-utils.inline.h @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H +#define IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H + +#include <algorithm> + +#include "glog/logging.h" +#include "kudu/util/bit-stream-utils.h" +#include "kudu/util/alignment.h" + +namespace kudu { + +inline void BitWriter::PutValue(uint64_t v, int num_bits) { + DCHECK_LE(num_bits, 64); + // Truncate the higher-order bits. This is necessary to + // support signed values. + v &= ~0ULL >> (64 - num_bits); + + + buffered_values_ |= v << bit_offset_; + bit_offset_ += num_bits; + + if (PREDICT_FALSE(bit_offset_ >= 64)) { + // Flush buffered_values_ and write out bits of v that did not fit + buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + 8, 8)); + buffer_->resize(byte_offset_ + 8); + DCHECK_LE(byte_offset_ + 8, buffer_->capacity()); + memcpy(buffer_->data() + byte_offset_, &buffered_values_, 8); + buffered_values_ = 0; + byte_offset_ += 8; + bit_offset_ -= 64; + buffered_values_ = BitUtil::ShiftRightZeroOnOverflow(v, (num_bits - bit_offset_)); + } + DCHECK_LT(bit_offset_, 64); +} + +inline void BitWriter::Flush(bool align) { + int num_bytes = BitUtil::Ceil(bit_offset_, 8); + buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + num_bytes, 8)); + buffer_->resize(byte_offset_ + num_bytes); + DCHECK_LE(byte_offset_ + num_bytes, buffer_->capacity()); + memcpy(buffer_->data() + byte_offset_, &buffered_values_, num_bytes); + + if (align) { + buffered_values_ = 0; + byte_offset_ += num_bytes; + bit_offset_ = 0; + } +} + +inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) { + Flush(/* align */ true); + buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + num_bytes, 8)); + buffer_->resize(byte_offset_ + num_bytes); + uint8_t* ptr = buffer_->data() + byte_offset_; + byte_offset_ += num_bytes; + DCHECK_LE(byte_offset_, buffer_->capacity()); + return ptr; +} + +template<typename T> +inline void BitWriter::PutAligned(T val, int num_bytes) { + DCHECK_LE(num_bytes, sizeof(T)); + uint8_t* ptr = GetNextBytePtr(num_bytes); + memcpy(ptr, &val, num_bytes); +} + +inline void BitWriter::PutVlqInt(int32_t v) { + while ((v & 0xFFFFFF80) != 0L) { + PutAligned<uint8_t>((v & 0x7F) | 0x80, 1); + v >>= 7; + } + PutAligned<uint8_t>(v & 0x7F, 1); +} + + +inline BitReader::BitReader(const uint8_t* buffer, int buffer_len) + : buffer_(buffer), + max_bytes_(buffer_len), + buffered_values_(0), + byte_offset_(0), + bit_offset_(0) { + int num_bytes = std::min(8, max_bytes_); + memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes); +} + +inline void BitReader::BufferValues() { + int bytes_remaining = max_bytes_ - byte_offset_; + if (PREDICT_TRUE(bytes_remaining >= 8)) { + memcpy(&buffered_values_, buffer_ + byte_offset_, 8); + } else { + memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); + } +} + +template<typename T> +inline bool BitReader::GetValue(int num_bits, T* v) { + DCHECK_LE(num_bits, 64); + DCHECK_LE(num_bits, sizeof(T) * 8); + + if (PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false; + + *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_; + + bit_offset_ += num_bits; + if (bit_offset_ >= 64) { + byte_offset_ += 8; + bit_offset_ -= 64; + BufferValues(); + // Read bits of v that crossed into new buffered_values_ + *v |= BitUtil::ShiftLeftZeroOnOverflow( + BitUtil::TrailingBits(buffered_values_, bit_offset_), + (num_bits - bit_offset_)); + } + DCHECK_LE(bit_offset_, 64); + return true; +} + +inline void BitReader::Rewind(int num_bits) { + bit_offset_ -= num_bits; + if (bit_offset_ >= 0) { + return; + } + while (bit_offset_ < 0) { + int seek_back = std::min(byte_offset_, 8); + byte_offset_ -= seek_back; + bit_offset_ += seek_back * 8; + } + // This should only be executed *if* rewinding by 'num_bits' + // make the existing buffered_values_ invalid + DCHECK_GE(byte_offset_, 0); // Check for underflow + memcpy(&buffered_values_, buffer_ + byte_offset_, 8); +} + +inline void BitReader::SeekToBit(uint stream_position) { + DCHECK_LE(stream_position, max_bytes_ * 8); + + int delta = stream_position - position(); + if (delta == 0) { + return; + } else if (delta < 0) { + Rewind(position() - stream_position); + } else { + bit_offset_ += delta; + while (bit_offset_ >= 64) { + byte_offset_ +=8; + bit_offset_ -= 64; + if (bit_offset_ < 64) { + // This should only be executed if seeking to + // 'stream_position' makes the existing buffered_values_ + // invalid. + BufferValues(); + } + } + } +} + +template<typename T> +inline bool BitReader::GetAligned(int num_bytes, T* v) { + DCHECK_LE(num_bytes, sizeof(T)); + int bytes_read = BitUtil::Ceil(bit_offset_, 8); + if (PREDICT_FALSE(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false; + + // Advance byte_offset to next unread byte and read num_bytes + byte_offset_ += bytes_read; + memcpy(v, buffer_ + byte_offset_, num_bytes); + byte_offset_ += num_bytes; + + // Reset buffered_values_ + bit_offset_ = 0; + int bytes_remaining = max_bytes_ - byte_offset_; + if (PREDICT_TRUE(bytes_remaining >= 8)) { + memcpy(&buffered_values_, buffer_ + byte_offset_, 8); + } else { + memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); + } + return true; +} + +inline bool BitReader::GetVlqInt(int32_t* v) { + *v = 0; + int shift = 0; + int num_bytes = 0; + uint8_t byte = 0; + do { + if (!GetAligned<uint8_t>(1, &byte)) return false; + *v |= (byte & 0x7F) << shift; + shift += 7; + DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN); + } while ((byte & 0x80) != 0); + return true; +} + +} // namespace kudu + +#endif
