http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/build_tools/make_package.sh ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/build_tools/make_package.sh b/thirdparty/rocksdb/build_tools/make_package.sh new file mode 100755 index 0000000..58bac44 --- /dev/null +++ b/thirdparty/rocksdb/build_tools/make_package.sh @@ -0,0 +1,128 @@ +#/usr/bin/env bash + +set -e + +function log() { + echo "[+] $1" +} + +function fatal() { + echo "[!] $1" + exit 1 +} + +function platform() { + local __resultvar=$1 + if [[ -f "/etc/yum.conf" ]]; then + eval $__resultvar="centos" + elif [[ -f "/etc/dpkg/dpkg.cfg" ]]; then + eval $__resultvar="ubuntu" + else + fatal "Unknwon operating system" + fi +} +platform OS + +function package() { + if [[ $OS = "ubuntu" ]]; then + if dpkg --get-selections | grep --quiet $1; then + log "$1 is already installed. skipping." + else + apt-get install $@ -y + fi + elif [[ $OS = "centos" ]]; then + if rpm -qa | grep --quiet $1; then + log "$1 is already installed. skipping." + else + yum install $@ -y + fi + fi +} + +function detect_fpm_output() { + if [[ $OS = "ubuntu" ]]; then + export FPM_OUTPUT=deb + elif [[ $OS = "centos" ]]; then + export FPM_OUTPUT=rpm + fi +} +detect_fpm_output + +function gem_install() { + if gem list | grep --quiet $1; then + log "$1 is already installed. skipping." + else + gem install $@ + fi +} + +function main() { + if [[ $# -ne 1 ]]; then + fatal "Usage: $0 <rocksdb_version>" + else + log "using rocksdb version: $1" + fi + + if [[ -d /vagrant ]]; then + if [[ $OS = "ubuntu" ]]; then + package g++-4.8 + export CXX=g++-4.8 + + # the deb would depend on libgflags2, but the static lib is the only thing + # installed by make install + package libgflags-dev + + package ruby-all-dev + elif [[ $OS = "centos" ]]; then + pushd /etc/yum.repos.d + if [[ ! -f /etc/yum.repos.d/devtools-1.1.repo ]]; then + wget http://people.centos.org/tru/devtools-1.1/devtools-1.1.repo + fi + package devtoolset-1.1-gcc --enablerepo=testing-1.1-devtools-6 + package devtoolset-1.1-gcc-c++ --enablerepo=testing-1.1-devtools-6 + export CC=/opt/centos/devtoolset-1.1/root/usr/bin/gcc + export CPP=/opt/centos/devtoolset-1.1/root/usr/bin/cpp + export CXX=/opt/centos/devtoolset-1.1/root/usr/bin/c++ + export PATH=$PATH:/opt/centos/devtoolset-1.1/root/usr/bin + popd + if ! rpm -qa | grep --quiet gflags; then + rpm -i https://github.com/schuhschuh/gflags/releases/download/v2.1.0/gflags-devel-2.1.0-1.amd64.rpm + fi + + package ruby + package ruby-devel + package rubygems + package rpm-build + fi + fi + gem_install fpm + + make static_lib + make install INSTALL_PATH=package + + cd package + + LIB_DIR=lib + if [[ -z "$ARCH" ]]; then + ARCH=$(getconf LONG_BIT) + fi + if [[ ("$FPM_OUTPUT" = "rpm") && ($ARCH -eq 64) ]]; then + mv lib lib64 + LIB_DIR=lib64 + fi + + fpm \ + -s dir \ + -t $FPM_OUTPUT \ + -n rocksdb \ + -v $1 \ + --prefix /usr \ + --url http://rocksdb.org/ \ + -m [email protected] \ + --license BSD \ + --vendor Facebook \ + --description "RocksDB is an embeddable persistent key-value store for fast storage." \ + include $LIB_DIR +} + +main $@
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/build_tools/precommit_checker.py ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/build_tools/precommit_checker.py b/thirdparty/rocksdb/build_tools/precommit_checker.py new file mode 100755 index 0000000..0f8884d --- /dev/null +++ b/thirdparty/rocksdb/build_tools/precommit_checker.py @@ -0,0 +1,208 @@ +#!/usr/local/fbcode/gcc-4.9-glibc-2.20-fb/bin/python2.7 + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals +import argparse +import commands +import subprocess +import sys +import re +import os +import time + + +# +# Simple logger +# + +class Log: + + def __init__(self, filename): + self.filename = filename + self.f = open(self.filename, 'w+', 0) + + def caption(self, str): + line = "\n##### %s #####\n" % str + if self.f: + self.f.write("%s \n" % line) + else: + print(line) + + def error(self, str): + data = "\n\n##### ERROR ##### %s" % str + if self.f: + self.f.write("%s \n" % data) + else: + print(data) + + def log(self, str): + if self.f: + self.f.write("%s \n" % str) + else: + print(str) + +# +# Shell Environment +# + + +class Env(object): + + def __init__(self, logfile, tests): + self.tests = tests + self.log = Log(logfile) + + def shell(self, cmd, path=os.getcwd()): + if path: + os.chdir(path) + + self.log.log("==== shell session ===========================") + self.log.log("%s> %s" % (path, cmd)) + status = subprocess.call("cd %s; %s" % (path, cmd), shell=True, + stdout=self.log.f, stderr=self.log.f) + self.log.log("status = %s" % status) + self.log.log("============================================== \n\n") + return status + + def GetOutput(self, cmd, path=os.getcwd()): + if path: + os.chdir(path) + + self.log.log("==== shell session ===========================") + self.log.log("%s> %s" % (path, cmd)) + status, out = commands.getstatusoutput(cmd) + self.log.log("status = %s" % status) + self.log.log("out = %s" % out) + self.log.log("============================================== \n\n") + return status, out + +# +# Pre-commit checker +# + + +class PreCommitChecker(Env): + + def __init__(self, args): + Env.__init__(self, args.logfile, args.tests) + self.ignore_failure = args.ignore_failure + + # + # Get commands for a given job from the determinator file + # + def get_commands(self, test): + status, out = self.GetOutput( + "RATIO=1 build_tools/rocksdb-lego-determinator %s" % test, ".") + return status, out + + # + # Run a specific CI job + # + def run_test(self, test): + self.log.caption("Running test %s locally" % test) + + # get commands for the CI job determinator + status, cmds = self.get_commands(test) + if status != 0: + self.log.error("Error getting commands for test %s" % test) + return False + + # Parse the JSON to extract the commands to run + cmds = re.findall("'shell':'([^\']*)'", cmds) + + if len(cmds) == 0: + self.log.log("No commands found") + return False + + # Run commands + for cmd in cmds: + # Replace J=<..> with the local environment variable + if "J" in os.environ: + cmd = cmd.replace("J=1", "J=%s" % os.environ["J"]) + cmd = cmd.replace("make ", "make -j%s " % os.environ["J"]) + # Run the command + status = self.shell(cmd, ".") + if status != 0: + self.log.error("Error running command %s for test %s" + % (cmd, test)) + return False + + return True + + # + # Run specified CI jobs + # + def run_tests(self): + if not self.tests: + self.log.error("Invalid args. Please provide tests") + return False + + self.print_separator() + self.print_row("TEST", "RESULT") + self.print_separator() + + result = True + for test in self.tests: + start_time = time.time() + self.print_test(test) + result = self.run_test(test) + elapsed_min = (time.time() - start_time) / 60 + if not result: + self.log.error("Error running test %s" % test) + self.print_result("FAIL (%dm)" % elapsed_min) + if not self.ignore_failure: + return False + result = False + else: + self.print_result("PASS (%dm)" % elapsed_min) + + self.print_separator() + return result + + # + # Print a line + # + def print_separator(self): + print("".ljust(60, "-")) + + # + # Print two colums + # + def print_row(self, c0, c1): + print("%s%s" % (c0.ljust(40), c1.ljust(20))) + + def print_test(self, test): + print(test.ljust(40), end="") + sys.stdout.flush() + + def print_result(self, result): + print(result.ljust(20)) + +# +# Main +# +parser = argparse.ArgumentParser(description='RocksDB pre-commit checker.') + +# --log <logfile> +parser.add_argument('--logfile', default='/tmp/precommit-check.log', + help='Log file. Default is /tmp/precommit-check.log') +# --ignore_failure +parser.add_argument('--ignore_failure', action='store_true', default=False, + help='Stop when an error occurs') +# <test ....> +parser.add_argument('tests', nargs='+', + help='CI test(s) to run. e.g: unit punit asan tsan ubsan') + +args = parser.parse_args() +checker = PreCommitChecker(args) + +print("Please follow log %s" % checker.log.filename) + +if not checker.run_tests(): + print("Error running tests. Please check log file %s" + % checker.log.filename) + sys.exit(1) + +sys.exit(0) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/build_tools/rocksdb-lego-determinator ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/build_tools/rocksdb-lego-determinator b/thirdparty/rocksdb/build_tools/rocksdb-lego-determinator new file mode 100755 index 0000000..a40b306 --- /dev/null +++ b/thirdparty/rocksdb/build_tools/rocksdb-lego-determinator @@ -0,0 +1,805 @@ +#!/usr/bin/env bash +# This script is executed by Sandcastle +# to determine next steps to run + +# Usage: +# EMAIL=<email> ONCALL=<email> TRIGGER=<trigger> SUBSCRIBER=<email> rocks_ci.py <test-name> +# +# Input Value +# ------------------------------------------------------------------------- +# EMAIL Email address to report on trigger conditions +# ONCALL Email address to raise a task on failure +# TRIGGER Trigger conditions for email. Valid values are fail, warn, all +# SUBSCRIBER Email addresss to add as subscriber for task +# + +# +# Report configuration +# +REPORT_EMAIL= +if [ ! -z $EMAIL ]; then + if [ -z $TRIGGER ]; then + TRIGGER="fail" + fi + + REPORT_EMAIL=" + { + 'type':'email', + 'triggers': [ '$TRIGGER' ], + 'emails':['$EMAIL'] + }," +fi + +CREATE_TASK= +if [ ! -z $ONCALL ]; then + CREATE_TASK=" + { + 'type':'task', + 'triggers':[ 'fail' ], + 'priority':0, + 'subscribers':[ '$SUBSCRIBER' ], + 'tags':[ 'rocksdb', 'ci' ], + }," +fi + +# For now, create the tasks using only the dedicated task creation tool. +CREATE_TASK= + +REPORT= +if [[ ! -z $REPORT_EMAIL || ! -z $CREATE_TASK ]]; then + REPORT="'report': [ + $REPORT_EMAIL + $CREATE_TASK + ]" +fi + +# +# Helper variables +# +CLEANUP_ENV=" +{ + 'name':'Cleanup environment', + 'shell':'rm -rf /dev/shm/rocksdb && mkdir /dev/shm/rocksdb && (chmod +t /dev/shm || true) && make clean', + 'user':'root' +}" + +# We will eventually set the RATIO to 1, but we want do this +# in steps. RATIO=$(nproc) will make it work as J=1 +if [ -z $RATIO ]; then + RATIO=$(nproc) +fi + +if [ -z $PARALLEL_J ]; then + PARALLEL_J="J=$(expr $(nproc) / ${RATIO})" +fi + +if [ -z $PARALLEL_j ]; then + PARALLEL_j="-j$(expr $(nproc) / ${RATIO})" +fi + +PARALLELISM="$PARALLEL_J $PARALLEL_j" + +DEBUG="OPT=-g" +SHM="TEST_TMPDIR=/dev/shm/rocksdb" +NON_SHM="TMPD=/tmp/rocksdb_test_tmp" +GCC_481="ROCKSDB_FBCODE_BUILD_WITH_481=1" +ASAN="COMPILE_WITH_ASAN=1" +CLANG="USE_CLANG=1" +LITE="OPT=\"-DROCKSDB_LITE -g\"" +TSAN="COMPILE_WITH_TSAN=1" +UBSAN="COMPILE_WITH_UBSAN=1" +DISABLE_JEMALLOC="DISABLE_JEMALLOC=1" +HTTP_PROXY="https_proxy=http://fwdproxy.29.prn1:8080 http_proxy=http://fwdproxy.29.prn1:8080 ftp_proxy=http://fwdproxy.29.prn1:8080" +SETUP_JAVA_ENV="export $HTTP_PROXY; export JAVA_HOME=/usr/local/jdk-8u60-64/; export PATH=\$JAVA_HOME/bin:\$PATH" +PARSER="'parser':'python build_tools/error_filter.py $1'" + +CONTRUN_NAME="ROCKSDB_CONTRUN_NAME" + +# This code is getting called under various scenarios. What we care about is to +# understand when it's called from nightly contruns because in that case we'll +# create tasks for any failures. To follow the existing pattern, we'll check +# the value of $ONCALL. If it's a diff then just call `false` to make sure +# that errors will be properly propagated to the caller. +if [ ! -z $ONCALL ]; then + TASK_CREATION_TOOL="/usr/local/bin/mysql_mtr_filter --rocksdb --oncall $ONCALL" +else + TASK_CREATION_TOOL="false" +fi + +ARTIFACTS=" 'artifacts': [ + { + 'name':'database', + 'paths':[ '/dev/shm/rocksdb' ], + } +]" + +# +# A mechanism to disable tests temporarily +# +DISABLE_COMMANDS="[ + { + 'name':'Disable test', + 'oncall':'$ONCALL', + 'steps': [ + { + 'name':'Job disabled. Please contact test owner', + 'shell':'exit 1', + 'user':'root' + }, + ], + } +]" + +# +# RocksDB unit test +# +UNIT_TEST_COMMANDS="[ + { + 'name':'Rocksdb Unit Test', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and test RocksDB debug version', + 'shell':'$SHM $DEBUG make $PARALLELISM check || $CONTRUN_NAME=check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB unit test not under /dev/shm +# +UNIT_TEST_NON_SHM_COMMANDS="[ + { + 'name':'Rocksdb Unit Test', + 'oncall':'$ONCALL', + 'timeout': 86400, + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and test RocksDB debug version', + 'timeout': 86400, + 'shell':'$NON_SHM $DEBUG make $PARALLELISM check || $CONTRUN_NAME=non_shm_check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB release build and unit tests +# +RELEASE_BUILD_COMMANDS="[ + { + 'name':'Rocksdb Release Build', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build RocksDB release', + 'shell':'make $PARALLEL_j release || $CONTRUN_NAME=release $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB unit test on gcc-4.8.1 +# +UNIT_TEST_COMMANDS_481="[ + { + 'name':'Rocksdb Unit Test on GCC 4.8.1', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and test RocksDB debug version', + 'shell':'$SHM $GCC_481 $DEBUG make $PARALLELISM check || $CONTRUN_NAME=unit_gcc_481_check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB release build and unit tests +# +RELEASE_BUILD_COMMANDS_481="[ + { + 'name':'Rocksdb Release on GCC 4.8.1', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build RocksDB release on GCC 4.8.1', + 'shell':'$GCC_481 make $PARALLEL_j release || $CONTRUN_NAME=release_gcc481 $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB unit test with CLANG +# +CLANG_UNIT_TEST_COMMANDS="[ + { + 'name':'Rocksdb Unit Test', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and test RocksDB debug', + 'shell':'$CLANG $SHM $DEBUG make $PARALLELISM check || $CONTRUN_NAME=clang_check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB release build with CLANG +# +CLANG_RELEASE_BUILD_COMMANDS="[ + { + 'name':'Rocksdb CLANG Release Build', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build RocksDB release', + 'shell':'$CLANG make $PARALLEL_j release|| $CONTRUN_NAME=clang_release $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB analyze +# +CLANG_ANALYZE_COMMANDS="[ + { + 'name':'Rocksdb analyze', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'RocksDB build and analyze', + 'shell':'$CLANG $SHM $DEBUG make $PARALLEL_j analyze || $CONTRUN_NAME=clang_analyze $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB code coverage +# +CODE_COV_COMMANDS="[ + { + 'name':'Rocksdb Unit Test Code Coverage', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build, test and collect code coverage info', + 'shell':'$SHM $DEBUG make $PARALLELISM coverage || $CONTRUN_NAME=coverage $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB unity +# +UNITY_COMMANDS="[ + { + 'name':'Rocksdb Unity', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build, test unity test', + 'shell':'$SHM $DEBUG V=1 make J=1 unity_test || $CONTRUN_NAME=unity_test $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# Build RocksDB lite +# +LITE_BUILD_COMMANDS="[ + { + 'name':'Rocksdb Lite build', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build RocksDB debug version', + 'shell':'$LITE make J=1 static_lib || $CONTRUN_NAME=lite_static_lib $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB lite tests +# +LITE_UNIT_TEST_COMMANDS="[ + { + 'name':'Rocksdb Lite Unit Test', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build RocksDB debug version', + 'shell':'$SHM $LITE make J=1 check || $CONTRUN_NAME=lite_check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB stress/crash test +# +STRESS_CRASH_TEST_COMMANDS="[ + { + 'name':'Rocksdb Stress/Crash Test', + 'oncall':'$ONCALL', + 'timeout': 86400, + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and run RocksDB debug stress tests', + 'shell':'$SHM $DEBUG make J=1 db_stress || $CONTRUN_NAME=db_stress $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + { + 'name':'Build and run RocksDB debug crash tests', + 'timeout': 86400, + 'shell':'$SHM $DEBUG make J=1 crash_test || $CONTRUN_NAME=crash_test $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + } + ], + $ARTIFACTS, + $REPORT + } +]" + +# RocksDB write stress test. +# We run on disk device on purpose (i.e. no $SHM) +# because we want to add some randomness to fsync commands +WRITE_STRESS_COMMANDS="[ + { + 'name':'Rocksdb Write Stress Test', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and run RocksDB write stress tests', + 'shell':'make write_stress && python tools/write_stress_runner.py --runtime_sec=3600 --db=/tmp/rocksdb_write_stress || $CONTRUN_NAME=write_stress $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + } + ], + 'artifacts': [{'name': 'database', 'paths': ['/tmp/rocksdb_write_stress']}], + $REPORT + } +]" + + +# +# RocksDB test under address sanitizer +# +ASAN_TEST_COMMANDS="[ + { + 'name':'Rocksdb Unit Test under ASAN', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Test RocksDB debug under ASAN', +'shell':'set -o pipefail && ($SHM $ASAN $DEBUG make $PARALLELISM asan_check || $CONTRUN_NAME=asan_check $TASK_CREATION_TOOL) |& /usr/facebook/ops/scripts/asan_symbolize.py -d', + 'user':'root', + $PARSER + } + ], + $REPORT + } +]" + +# +# RocksDB crash testing under address sanitizer +# +ASAN_CRASH_TEST_COMMANDS="[ + { + 'name':'Rocksdb crash test under ASAN', + 'oncall':'$ONCALL', + 'timeout': 86400, + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and run RocksDB debug asan_crash_test', + 'timeout': 86400, + 'shell':'$SHM $DEBUG make J=1 asan_crash_test || $CONTRUN_NAME=asan_crash_test $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB test under undefined behavior sanitizer +# +UBSAN_TEST_COMMANDS="[ + { + 'name':'Rocksdb Unit Test under UBSAN', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Test RocksDB debug under UBSAN', + 'shell':'set -o pipefail && $SHM $UBSAN $DEBUG make $PARALLELISM ubsan_check || $CONTRUN_NAME=ubsan_check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + } + ], + $REPORT + } +]" + +# +# RocksDB crash testing under udnefined behavior sanitizer +# +UBSAN_CRASH_TEST_COMMANDS="[ + { + 'name':'Rocksdb crash test under UBSAN', + 'oncall':'$ONCALL', + 'timeout': 86400, + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build and run RocksDB debug ubsan_crash_test', + 'timeout': 86400, + 'shell':'$SHM $DEBUG make J=1 ubsan_crash_test || $CONTRUN_NAME=ubsan_crash_test $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB unit test under valgrind +# +VALGRIND_TEST_COMMANDS="[ + { + 'name':'Rocksdb Unit Test under valgrind', + 'oncall':'$ONCALL', + 'timeout': 86400, + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Run RocksDB debug unit tests', + 'timeout': 86400, + 'shell':'$SHM $DEBUG make $PARALLELISM valgrind_test || $CONTRUN_NAME=valgrind_check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB test under TSAN +# +TSAN_UNIT_TEST_COMMANDS="[ + { + 'name':'Rocksdb Unit Test under TSAN', + 'oncall':'$ONCALL', + 'timeout': 86400, + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Run RocksDB debug unit test', + 'timeout': 86400, + 'shell':'set -o pipefail && $SHM $DEBUG $TSAN make $PARALLELISM check || $CONTRUN_NAME=tsan_check $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB crash test under TSAN +# +TSAN_CRASH_TEST_COMMANDS="[ + { + 'name':'Rocksdb Crash Test under TSAN', + 'oncall':'$ONCALL', + 'timeout': 86400, + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Compile and run', + 'timeout': 86400, + 'shell':'set -o pipefail && $SHM $DEBUG $TSAN CRASH_TEST_KILL_ODD=1887 CRASH_TEST_EXT_ARGS=--log2_keys_per_lock=22 make J=1 crash_test || $CONTRUN_NAME=tsan_crash_test $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB format compatible +# + +run_format_compatible() +{ + export TEST_TMPDIR=/dev/shm/rocksdb + rm -rf /dev/shm/rocksdb + mkdir /dev/shm/rocksdb + + tools/check_format_compatible.sh +} + +FORMAT_COMPATIBLE_COMMANDS="[ + { + 'name':'Rocksdb Format Compatible tests', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Run RocksDB debug unit test', + 'shell':'build_tools/rocksdb-lego-determinator run_format_compatible || $CONTRUN_NAME=run_format_compatible $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB no compression +# +run_no_compression() +{ + export TEST_TMPDIR=/dev/shm/rocksdb + rm -rf /dev/shm/rocksdb + mkdir /dev/shm/rocksdb + make clean + cat build_tools/fbcode_config.sh | grep -iv dzlib | grep -iv dlz4 | grep -iv dsnappy | grep -iv dbzip2 > .tmp.fbcode_config.sh + mv .tmp.fbcode_config.sh build_tools/fbcode_config.sh + cat Makefile | grep -v tools/ldb_test.py > .tmp.Makefile + mv .tmp.Makefile Makefile + make $DEBUG J=1 check +} + +NO_COMPRESSION_COMMANDS="[ + { + 'name':'Rocksdb No Compression tests', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Run RocksDB debug unit test', + 'shell':'build_tools/rocksdb-lego-determinator run_no_compression || $CONTRUN_NAME=run_no_compression $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB regression +# +run_regression() +{ + time -v bash -vx ./build_tools/regression_build_test.sh $(mktemp -d $WORKSPACE/leveldb.XXXX) $(mktemp leveldb_test_stats.XXXX) + + # ======= report size to ODS ======== + + # parameters: $1 -- key, $2 -- value + function send_size_to_ods { + curl -s "https://www.intern.facebook.com/intern/agent/ods_set.php?entity=rocksdb_build&key=rocksdb.build_size.$1&value=$2" \ + --connect-timeout 60 + } + + # === normal build === + make clean + make -j$(nproc) static_lib + send_size_to_ods static_lib $(stat --printf="%s" librocksdb.a) + strip librocksdb.a + send_size_to_ods static_lib_stripped $(stat --printf="%s" librocksdb.a) + + make -j$(nproc) shared_lib + send_size_to_ods shared_lib $(stat --printf="%s" `readlink -f librocksdb.so`) + strip `readlink -f librocksdb.so` + send_size_to_ods shared_lib_stripped $(stat --printf="%s" `readlink -f librocksdb.so`) + + # === lite build === + make clean + OPT=-DROCKSDB_LITE make -j$(nproc) static_lib + send_size_to_ods static_lib_lite $(stat --printf="%s" librocksdb.a) + strip librocksdb.a + send_size_to_ods static_lib_lite_stripped $(stat --printf="%s" librocksdb.a) + + OPT=-DROCKSDB_LITE make -j$(nproc) shared_lib + send_size_to_ods shared_lib_lite $(stat --printf="%s" `readlink -f librocksdb.so`) + strip `readlink -f librocksdb.so` + send_size_to_ods shared_lib_lite_stripped $(stat --printf="%s" `readlink -f librocksdb.so`) +} + +REGRESSION_COMMANDS="[ + { + 'name':'Rocksdb regression commands', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Make and run script', + 'shell':'build_tools/rocksdb-lego-determinator run_regression || $CONTRUN_NAME=run_regression $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + +# +# RocksDB Java build +# +JAVA_BUILD_TEST_COMMANDS="[ + { + 'name':'Rocksdb Java Build', + 'oncall':'$ONCALL', + 'steps': [ + $CLEANUP_ENV, + { + 'name':'Build RocksDB for Java', + 'shell':'$SETUP_JAVA_ENV; $SHM make rocksdbjava || $CONTRUN_NAME=rocksdbjava $TASK_CREATION_TOOL', + 'user':'root', + $PARSER + }, + ], + $REPORT + } +]" + + +case $1 in + unit) + echo $UNIT_TEST_COMMANDS + ;; + unit_non_shm) + echo $UNIT_TEST_NON_SHM_COMMANDS + ;; + release) + echo $RELEASE_BUILD_COMMANDS + ;; + unit_481) + echo $UNIT_TEST_COMMANDS_481 + ;; + release_481) + echo $RELEASE_BUILD_COMMANDS_481 + ;; + clang_unit) + echo $CLANG_UNIT_TEST_COMMANDS + ;; + clang_release) + echo $CLANG_RELEASE_BUILD_COMMANDS + ;; + clang_analyze) + echo $CLANG_ANALYZE_COMMANDS + ;; + code_cov) + echo $CODE_COV_COMMANDS + ;; + unity) + echo $UNITY_COMMANDS + ;; + lite) + echo $LITE_BUILD_COMMANDS + ;; + lite_test) + echo $LITE_UNIT_TEST_COMMANDS + ;; + stress_crash) + echo $STRESS_CRASH_TEST_COMMANDS + ;; + write_stress) + echo $WRITE_STRESS_COMMANDS + ;; + asan) + echo $ASAN_TEST_COMMANDS + ;; + asan_crash) + echo $ASAN_CRASH_TEST_COMMANDS + ;; + ubsan) + echo $UBSAN_TEST_COMMANDS + ;; + ubsan_crash) + echo $UBSAN_CRASH_TEST_COMMANDS + ;; + valgrind) + echo $VALGRIND_TEST_COMMANDS + ;; + tsan) + echo $TSAN_UNIT_TEST_COMMANDS + ;; + tsan_crash) + echo $TSAN_CRASH_TEST_COMMANDS + ;; + format_compatible) + echo $FORMAT_COMPATIBLE_COMMANDS + ;; + run_format_compatible) + run_format_compatible + ;; + no_compression) + echo $NO_COMPRESSION_COMMANDS + ;; + run_no_compression) + run_no_compression + ;; + regression) + echo $REGRESSION_COMMANDS + ;; + run_regression) + run_regression + ;; + java_build) + echo $JAVA_BUILD_TEST_COMMANDS + ;; + *) + echo "Invalid determinator command" + ;; +esac http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/build_tools/update_dependencies.sh ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/build_tools/update_dependencies.sh b/thirdparty/rocksdb/build_tools/update_dependencies.sh new file mode 100755 index 0000000..c7b9932 --- /dev/null +++ b/thirdparty/rocksdb/build_tools/update_dependencies.sh @@ -0,0 +1,131 @@ +#!/bin/sh +# +# Update dependencies.sh file with the latest avaliable versions + +BASEDIR=$(dirname $0) +OUTPUT="" + +function log_variable() +{ + echo "$1=${!1}" >> "$OUTPUT" +} + + +TP2_LATEST="/mnt/vol/engshare/fbcode/third-party2" +## $1 => lib name +## $2 => lib version (if not provided, will try to pick latest) +## $3 => platform (if not provided, will try to pick latest gcc) +## +## get_lib_base will set a variable named ${LIB_NAME}_BASE to the lib location +function get_lib_base() +{ + local lib_name=$1 + local lib_version=$2 + local lib_platform=$3 + + local result="$TP2_LATEST/$lib_name/" + + # Lib Version + if [ -z "$lib_version" ] || [ "$lib_version" = "LATEST" ]; then + # version is not provided, use latest + result=`ls -dr1v $result/*/ | head -n1` + else + result="$result/$lib_version/" + fi + + # Lib Platform + if [ -z "$lib_platform" ]; then + # platform is not provided, use latest gcc + result=`ls -dr1v $result/gcc-*[^fb]/ | head -n1` + else + result="$result/$lib_platform/" + fi + + result=`ls -1d $result/*/ | head -n1` + + # lib_name => LIB_NAME_BASE + local __res_var=${lib_name^^}"_BASE" + __res_var=`echo $__res_var | tr - _` + # LIB_NAME_BASE=$result + eval $__res_var=`readlink -f $result` + + log_variable $__res_var +} + +########################################################### +# 5.x dependencies # +########################################################### + +OUTPUT="$BASEDIR/dependencies.sh" + +rm -f "$OUTPUT" +touch "$OUTPUT" + +echo "Writing dependencies to $OUTPUT" + +# Compilers locations +GCC_BASE=`readlink -f $TP2_LATEST/gcc/5.x/centos6-native/*/` +CLANG_BASE=`readlink -f $TP2_LATEST/llvm-fb/stable/centos6-native/*/` + +log_variable GCC_BASE +log_variable CLANG_BASE + +# Libraries locations +get_lib_base libgcc 5.x +get_lib_base glibc 2.23 +get_lib_base snappy LATEST gcc-5-glibc-2.23 +get_lib_base zlib LATEST +get_lib_base bzip2 LATEST +get_lib_base lz4 LATEST +get_lib_base zstd LATEST +get_lib_base gflags LATEST +get_lib_base jemalloc LATEST +get_lib_base numa LATEST +get_lib_base libunwind LATEST +get_lib_base tbb 4.0_update2 gcc-5-glibc-2.23 + +get_lib_base kernel-headers LATEST +get_lib_base binutils LATEST centos6-native +get_lib_base valgrind 3.10.0 gcc-5-glibc-2.23 +get_lib_base lua 5.2.3 gcc-5-glibc-2.23 + +git diff $OUTPUT + +########################################################### +# 4.8.1 dependencies # +########################################################### + +OUTPUT="$BASEDIR/dependencies_4.8.1.sh" + +rm -f "$OUTPUT" +touch "$OUTPUT" + +echo "Writing 4.8.1 dependencies to $OUTPUT" + +# Compilers locations +GCC_BASE=`readlink -f $TP2_LATEST/gcc/4.8.1/centos6-native/*/` +CLANG_BASE=`readlink -f $TP2_LATEST/llvm-fb/stable/centos6-native/*/` + +log_variable GCC_BASE +log_variable CLANG_BASE + +# Libraries locations +get_lib_base libgcc 4.8.1 gcc-4.8.1-glibc-2.17 +get_lib_base glibc 2.17 gcc-4.8.1-glibc-2.17 +get_lib_base snappy LATEST gcc-4.8.1-glibc-2.17 +get_lib_base zlib LATEST gcc-4.8.1-glibc-2.17 +get_lib_base bzip2 LATEST gcc-4.8.1-glibc-2.17 +get_lib_base lz4 LATEST gcc-4.8.1-glibc-2.17 +get_lib_base zstd LATEST gcc-4.8.1-glibc-2.17 +get_lib_base gflags LATEST gcc-4.8.1-glibc-2.17 +get_lib_base jemalloc LATEST gcc-4.8.1-glibc-2.17 +get_lib_base numa LATEST gcc-4.8.1-glibc-2.17 +get_lib_base libunwind LATEST gcc-4.8.1-glibc-2.17 +get_lib_base tbb 4.0_update2 gcc-4.8.1-glibc-2.17 + +get_lib_base kernel-headers LATEST gcc-4.8.1-glibc-2.17 +get_lib_base binutils LATEST centos6-native +get_lib_base valgrind 3.8.1 gcc-4.8.1-glibc-2.17 +get_lib_base lua 5.2.3 centos6-native + +git diff $OUTPUT http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/build_tools/version.sh ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/build_tools/version.sh b/thirdparty/rocksdb/build_tools/version.sh new file mode 100755 index 0000000..f3ca98c --- /dev/null +++ b/thirdparty/rocksdb/build_tools/version.sh @@ -0,0 +1,22 @@ +#!/bin/sh +if [ "$#" = "0" ]; then + echo "Usage: $0 major|minor|patch|full" + exit 1 +fi + +if [ "$1" = "major" ]; then + cat include/rocksdb/version.h | grep MAJOR | head -n1 | awk '{print $3}' +fi +if [ "$1" = "minor" ]; then + cat include/rocksdb/version.h | grep MINOR | head -n1 | awk '{print $3}' +fi +if [ "$1" = "patch" ]; then + cat include/rocksdb/version.h | grep PATCH | head -n1 | awk '{print $3}' +fi +if [ "$1" = "full" ]; then + awk '/#define ROCKSDB/ { env[$2] = $3 } + END { printf "%s.%s.%s\n", env["ROCKSDB_MAJOR"], + env["ROCKSDB_MINOR"], + env["ROCKSDB_PATCH"] }' \ + include/rocksdb/version.h +fi http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/cache/cache_bench.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/cache/cache_bench.cc b/thirdparty/rocksdb/cache/cache_bench.cc new file mode 100644 index 0000000..16c2ced --- /dev/null +++ b/thirdparty/rocksdb/cache/cache_bench.cc @@ -0,0 +1,284 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif +#ifndef GFLAGS +#include <cstdio> +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else + +#include <inttypes.h> +#include <sys/types.h> +#include <stdio.h> +#include <gflags/gflags.h> + +#include "rocksdb/db.h" +#include "rocksdb/cache.h" +#include "rocksdb/env.h" +#include "port/port.h" +#include "util/mutexlock.h" +#include "util/random.h" + +using GFLAGS::ParseCommandLineFlags; + +static const uint32_t KB = 1024; + +DEFINE_int32(threads, 16, "Number of concurrent threads to run."); +DEFINE_int64(cache_size, 8 * KB * KB, + "Number of bytes to use as a cache of uncompressed data."); +DEFINE_int32(num_shard_bits, 4, "shard_bits."); + +DEFINE_int64(max_key, 1 * KB * KB * KB, "Max number of key to place in cache"); +DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread."); + +DEFINE_bool(populate_cache, false, "Populate cache before operations"); +DEFINE_int32(insert_percent, 40, + "Ratio of insert to total workload (expressed as a percentage)"); +DEFINE_int32(lookup_percent, 50, + "Ratio of lookup to total workload (expressed as a percentage)"); +DEFINE_int32(erase_percent, 10, + "Ratio of erase to total workload (expressed as a percentage)"); + +DEFINE_bool(use_clock_cache, false, ""); + +namespace rocksdb { + +class CacheBench; +namespace { +void deleter(const Slice& key, void* value) { + delete reinterpret_cast<char *>(value); +} + +// State shared by all concurrent executions of the same benchmark. +class SharedState { + public: + explicit SharedState(CacheBench* cache_bench) + : cv_(&mu_), + num_threads_(FLAGS_threads), + num_initialized_(0), + start_(false), + num_done_(0), + cache_bench_(cache_bench) { + } + + ~SharedState() {} + + port::Mutex* GetMutex() { + return &mu_; + } + + port::CondVar* GetCondVar() { + return &cv_; + } + + CacheBench* GetCacheBench() const { + return cache_bench_; + } + + void IncInitialized() { + num_initialized_++; + } + + void IncDone() { + num_done_++; + } + + bool AllInitialized() const { + return num_initialized_ >= num_threads_; + } + + bool AllDone() const { + return num_done_ >= num_threads_; + } + + void SetStart() { + start_ = true; + } + + bool Started() const { + return start_; + } + + private: + port::Mutex mu_; + port::CondVar cv_; + + const uint64_t num_threads_; + uint64_t num_initialized_; + bool start_; + uint64_t num_done_; + + CacheBench* cache_bench_; +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + uint32_t tid; + Random rnd; + SharedState* shared; + + ThreadState(uint32_t index, SharedState* _shared) + : tid(index), rnd(1000 + index), shared(_shared) {} +}; +} // namespace + +class CacheBench { + public: + CacheBench() : num_threads_(FLAGS_threads) { + if (FLAGS_use_clock_cache) { + cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); + if (!cache_) { + fprintf(stderr, "Clock cache not supported.\n"); + exit(1); + } + } else { + cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); + } + } + + ~CacheBench() {} + + void PopulateCache() { + Random rnd(1); + for (int64_t i = 0; i < FLAGS_cache_size; i++) { + uint64_t rand_key = rnd.Next() % FLAGS_max_key; + // Cast uint64* to be char*, data would be copied to cache + Slice key(reinterpret_cast<char*>(&rand_key), 8); + // do insert + cache_->Insert(key, new char[10], 1, &deleter); + } + } + + bool Run() { + rocksdb::Env* env = rocksdb::Env::Default(); + + PrintEnv(); + SharedState shared(this); + std::vector<ThreadState*> threads(num_threads_); + for (uint32_t i = 0; i < num_threads_; i++) { + threads[i] = new ThreadState(i, &shared); + env->StartThread(ThreadBody, threads[i]); + } + { + MutexLock l(shared.GetMutex()); + while (!shared.AllInitialized()) { + shared.GetCondVar()->Wait(); + } + // Record start time + uint64_t start_time = env->NowMicros(); + + // Start all threads + shared.SetStart(); + shared.GetCondVar()->SignalAll(); + + // Wait threads to complete + while (!shared.AllDone()) { + shared.GetCondVar()->Wait(); + } + + // Record end time + uint64_t end_time = env->NowMicros(); + double elapsed = static_cast<double>(end_time - start_time) * 1e-6; + uint32_t qps = static_cast<uint32_t>( + static_cast<double>(FLAGS_threads * FLAGS_ops_per_thread) / elapsed); + fprintf(stdout, "Complete in %.3f s; QPS = %u\n", elapsed, qps); + } + return true; + } + + private: + std::shared_ptr<Cache> cache_; + uint32_t num_threads_; + + static void ThreadBody(void* v) { + ThreadState* thread = reinterpret_cast<ThreadState*>(v); + SharedState* shared = thread->shared; + + { + MutexLock l(shared->GetMutex()); + shared->IncInitialized(); + if (shared->AllInitialized()) { + shared->GetCondVar()->SignalAll(); + } + while (!shared->Started()) { + shared->GetCondVar()->Wait(); + } + } + thread->shared->GetCacheBench()->OperateCache(thread); + + { + MutexLock l(shared->GetMutex()); + shared->IncDone(); + if (shared->AllDone()) { + shared->GetCondVar()->SignalAll(); + } + } + } + + void OperateCache(ThreadState* thread) { + for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { + uint64_t rand_key = thread->rnd.Next() % FLAGS_max_key; + // Cast uint64* to be char*, data would be copied to cache + Slice key(reinterpret_cast<char*>(&rand_key), 8); + int32_t prob_op = thread->rnd.Uniform(100); + if (prob_op >= 0 && prob_op < FLAGS_insert_percent) { + // do insert + cache_->Insert(key, new char[10], 1, &deleter); + } else if (prob_op -= FLAGS_insert_percent && + prob_op < FLAGS_lookup_percent) { + // do lookup + auto handle = cache_->Lookup(key); + if (handle) { + cache_->Release(handle); + } + } else if (prob_op -= FLAGS_lookup_percent && + prob_op < FLAGS_erase_percent) { + // do erase + cache_->Erase(key); + } + } + } + + void PrintEnv() const { + printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); + printf("Number of threads : %d\n", FLAGS_threads); + printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); + printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); + printf("Num shard bits : %d\n", FLAGS_num_shard_bits); + printf("Max key : %" PRIu64 "\n", FLAGS_max_key); + printf("Populate cache : %d\n", FLAGS_populate_cache); + printf("Insert percentage : %d%%\n", FLAGS_insert_percent); + printf("Lookup percentage : %d%%\n", FLAGS_lookup_percent); + printf("Erase percentage : %d%%\n", FLAGS_erase_percent); + printf("----------------------------\n"); + } +}; +} // namespace rocksdb + +int main(int argc, char** argv) { + ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_threads <= 0) { + fprintf(stderr, "threads number <= 0\n"); + exit(1); + } + + rocksdb::CacheBench bench; + if (FLAGS_populate_cache) { + bench.PopulateCache(); + } + if (bench.Run()) { + return 0; + } else { + return 1; + } +} + +#endif // GFLAGS http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/cache/clock_cache.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/cache/clock_cache.cc b/thirdparty/rocksdb/cache/clock_cache.cc new file mode 100644 index 0000000..7e42714 --- /dev/null +++ b/thirdparty/rocksdb/cache/clock_cache.cc @@ -0,0 +1,729 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "cache/clock_cache.h" + +#ifndef SUPPORT_CLOCK_CACHE + +namespace rocksdb { + +std::shared_ptr<Cache> NewClockCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { + // Clock cache not supported. + return nullptr; +} + +} // namespace rocksdb + +#else + +#include <assert.h> +#include <atomic> +#include <deque> + +// "tbb/concurrent_hash_map.h" requires RTTI if exception is enabled. +// Disable it so users can chooose to disable RTTI. +#ifndef ROCKSDB_USE_RTTI +#define TBB_USE_EXCEPTIONS 0 +#endif +#include "tbb/concurrent_hash_map.h" + +#include "cache/sharded_cache.h" +#include "port/port.h" +#include "util/autovector.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +namespace { + +// An implementation of the Cache interface based on CLOCK algorithm, with +// better concurrent performance than LRUCache. The idea of CLOCK algorithm +// is to maintain all cache entries in a circular list, and an iterator +// (the "head") pointing to the last examined entry. Eviction starts from the +// current head. Each entry is given a second chance before eviction, if it +// has been access since last examine. In contrast to LRU, no modification +// to the internal data-structure (except for flipping the usage bit) needs +// to be done upon lookup. This gives us oppertunity to implement a cache +// with better concurrency. +// +// Each cache entry is represented by a cache handle, and all the handles +// are arranged in a circular list, as describe above. Upon erase of an entry, +// we never remove the handle. Instead, the handle is put into a recycle bin +// to be re-use. This is to avoid memory dealocation, which is hard to deal +// with in concurrent environment. +// +// The cache also maintains a concurrent hash map for lookup. Any concurrent +// hash map implementation should do the work. We currently use +// tbb::concurrent_hash_map because it supports concurrent erase. +// +// Each cache handle has the following flags and counters, which are squeeze +// in an atomic interger, to make sure the handle always be in a consistent +// state: +// +// * In-cache bit: whether the entry is reference by the cache itself. If +// an entry is in cache, its key would also be available in the hash map. +// * Usage bit: whether the entry has been access by user since last +// examine for eviction. Can be reset by eviction. +// * Reference count: reference count by user. +// +// An entry can be reference only when it's in cache. An entry can be evicted +// only when it is in cache, has no usage since last examine, and reference +// count is zero. +// +// The follow figure shows a possible layout of the cache. Boxes represents +// cache handles and numbers in each box being in-cache bit, usage bit and +// reference count respectively. +// +// hash map: +// +-------+--------+ +// | key | handle | +// +-------+--------+ +// | "foo" | 5 |-------------------------------------+ +// +-------+--------+ | +// | "bar" | 2 |--+ | +// +-------+--------+ | | +// | | +// head | | +// | | | +// circular list: | | | +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// |(0,0,0)|---|(1,1,0)|---|(0,0,0)|---|(0,1,3)|---|(1,0,0)|---| ... +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// | | +// +-------+ +-----------+ +// | | +// +---+---+ +// recycle bin: | 1 | 3 | +// +---+---+ +// +// Suppose we try to insert "baz" into the cache at this point and the cache is +// full. The cache will first look for entries to evict, starting from where +// head points to (the second entry). It resets usage bit of the second entry, +// skips the third and fourth entry since they are not in cache, and finally +// evict the fifth entry ("foo"). It looks at recycle bin for available handle, +// grabs handle 3, and insert the key into the handle. The following figure +// shows the resulting layout. +// +// hash map: +// +-------+--------+ +// | key | handle | +// +-------+--------+ +// | "baz" | 3 |-------------+ +// +-------+--------+ | +// | "bar" | 2 |--+ | +// +-------+--------+ | | +// | | +// | | head +// | | | +// circular list: | | | +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// |(0,0,0)|---|(1,0,0)|---|(1,0,0)|---|(0,1,3)|---|(0,0,0)|---| ... +// +-------+ +-------+ +-------+ +-------+ +-------+ +------- +// | | +// +-------+ +-----------------------------------+ +// | | +// +---+---+ +// recycle bin: | 1 | 5 | +// +---+---+ +// +// A global mutex guards the circular list, the head, and the recycle bin. +// We additionally require that modifying the hash map needs to hold the mutex. +// As such, Modifying the cache (such as Insert() and Erase()) require to +// hold the mutex. Lookup() only access the hash map and the flags associated +// with each handle, and don't require explicit locking. Release() has to +// acquire the mutex only when it releases the last reference to the entry and +// the entry has been erased from cache explicitly. A future improvement could +// be to remove the mutex completely. +// +// Benchmark: +// We run readrandom db_bench on a test DB of size 13GB, with size of each +// level: +// +// Level Files Size(MB) +// ------------------------- +// L0 1 0.01 +// L1 18 17.32 +// L2 230 182.94 +// L3 1186 1833.63 +// L4 4602 8140.30 +// +// We test with both 32 and 16 read threads, with 2GB cache size (the whole DB +// doesn't fits in) and 64GB cache size (the whole DB can fit in cache), and +// whether to put index and filter blocks in block cache. The benchmark runs +// with +// with RocksDB 4.10. We got the following result: +// +// Threads Cache Cache ClockCache LRUCache +// Size Index/Filter Throughput(MB/s) Hit Throughput(MB/s) Hit +// 32 2GB yes 466.7 85.9% 433.7 86.5% +// 32 2GB no 529.9 72.7% 532.7 73.9% +// 32 64GB yes 649.9 99.9% 507.9 99.9% +// 32 64GB no 740.4 99.9% 662.8 99.9% +// 16 2GB yes 278.4 85.9% 283.4 86.5% +// 16 2GB no 318.6 72.7% 335.8 73.9% +// 16 64GB yes 391.9 99.9% 353.3 99.9% +// 16 64GB no 433.8 99.8% 419.4 99.8% + +// Cache entry meta data. +struct CacheHandle { + Slice key; + uint32_t hash; + void* value; + size_t charge; + void (*deleter)(const Slice&, void* value); + + // Flags and counters associated with the cache handle: + // lowest bit: n-cache bit + // second lowest bit: usage bit + // the rest bits: reference count + // The handle is unused when flags equals to 0. The thread decreases the count + // to 0 is responsible to put the handle back to recycle_ and cleanup memory. + std::atomic<uint32_t> flags; + + CacheHandle() = default; + + CacheHandle(const CacheHandle& a) { *this = a; } + + CacheHandle(const Slice& k, void* v, + void (*del)(const Slice& key, void* value)) + : key(k), value(v), deleter(del) {} + + CacheHandle& operator=(const CacheHandle& a) { + // Only copy members needed for deletion. + key = a.key; + value = a.value; + deleter = a.deleter; + return *this; + } +}; + +// Key of hash map. We store hash value with the key for convenience. +struct CacheKey { + Slice key; + uint32_t hash_value; + + CacheKey() = default; + + CacheKey(const Slice& k, uint32_t h) { + key = k; + hash_value = h; + } + + static bool equal(const CacheKey& a, const CacheKey& b) { + return a.hash_value == b.hash_value && a.key == b.key; + } + + static size_t hash(const CacheKey& a) { + return static_cast<size_t>(a.hash_value); + } +}; + +struct CleanupContext { + // List of values to be deleted, along with the key and deleter. + autovector<CacheHandle> to_delete_value; + + // List of keys to be deleted. + autovector<const char*> to_delete_key; +}; + +// A cache shard which maintains its own CLOCK cache. +class ClockCacheShard : public CacheShard { + public: + // Hash map type. + typedef tbb::concurrent_hash_map<CacheKey, CacheHandle*, CacheKey> HashTable; + + ClockCacheShard(); + ~ClockCacheShard(); + + // Interfaces + virtual void SetCapacity(size_t capacity) override; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, + Cache::Priority priority) override; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + // If the entry in in cache, increase reference count and return true. + // Return false otherwise. + // + // Not necessary to hold mutex_ before being called. + virtual bool Ref(Cache::Handle* handle) override; + virtual bool Release(Cache::Handle* handle, + bool force_erase = false) override; + virtual void Erase(const Slice& key, uint32_t hash) override; + bool EraseAndConfirm(const Slice& key, uint32_t hash, + CleanupContext* context); + virtual size_t GetUsage() const override; + virtual size_t GetPinnedUsage() const override; + virtual void EraseUnRefEntries() override; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + private: + static const uint32_t kInCacheBit = 1; + static const uint32_t kUsageBit = 2; + static const uint32_t kRefsOffset = 2; + static const uint32_t kOneRef = 1 << kRefsOffset; + + // Helper functions to extract cache handle flags and counters. + static bool InCache(uint32_t flags) { return flags & kInCacheBit; } + static bool HasUsage(uint32_t flags) { return flags & kUsageBit; } + static uint32_t CountRefs(uint32_t flags) { return flags >> kRefsOffset; } + + // Decrease reference count of the entry. If this decreases the count to 0, + // recycle the entry. If set_usage is true, also set the usage bit. + // + // returns true if a value is erased. + // + // Not necessary to hold mutex_ before being called. + bool Unref(CacheHandle* handle, bool set_usage, CleanupContext* context); + + // Unset in-cache bit of the entry. Recycle the handle if necessary. + // + // returns true if a value is erased. + // + // Has to hold mutex_ before being called. + bool UnsetInCache(CacheHandle* handle, CleanupContext* context); + + // Put the handle back to recycle_ list, and put the value associated with + // it into to-be-deleted list. It doesn't cleanup the key as it might be + // reused by another handle. + // + // Has to hold mutex_ before being called. + void RecycleHandle(CacheHandle* handle, CleanupContext* context); + + // Delete keys and values in to-be-deleted list. Call the method without + // holding mutex, as destructors can be expensive. + void Cleanup(const CleanupContext& context); + + // Examine the handle for eviction. If the handle is in cache, usage bit is + // not set, and referece count is 0, evict it from cache. Otherwise unset + // the usage bit. + // + // Has to hold mutex_ before being called. + bool TryEvict(CacheHandle* value, CleanupContext* context); + + // Scan through the circular list, evict entries until we get enough capacity + // for new cache entry of specific size. Return true if success, false + // otherwise. + // + // Has to hold mutex_ before being called. + bool EvictFromCache(size_t charge, CleanupContext* context); + + CacheHandle* Insert(const Slice& key, uint32_t hash, void* value, + size_t change, + void (*deleter)(const Slice& key, void* value), + bool hold_reference, CleanupContext* context); + + // Guards list_, head_, and recycle_. In addition, updating table_ also has + // to hold the mutex, to avoid the cache being in inconsistent state. + mutable port::Mutex mutex_; + + // The circular list of cache handles. Initially the list is empty. Once a + // handle is needed by insertion, and no more handles are available in + // recycle bin, one more handle is appended to the end. + // + // We use std::deque for the circular list because we want to make sure + // pointers to handles are valid through out the life-cycle of the cache + // (in contrast to std::vector), and be able to grow the list (in contrast + // to statically allocated arrays). + std::deque<CacheHandle> list_; + + // Pointer to the next handle in the circular list to be examine for + // eviction. + size_t head_; + + // Recycle bin of cache handles. + autovector<CacheHandle*> recycle_; + + // Maximum cache size. + std::atomic<size_t> capacity_; + + // Current total size of the cache. + std::atomic<size_t> usage_; + + // Total un-released cache size. + std::atomic<size_t> pinned_usage_; + + // Whether allow insert into cache if cache is full. + std::atomic<bool> strict_capacity_limit_; + + // Hash table (tbb::concurrent_hash_map) for lookup. + HashTable table_; +}; + +ClockCacheShard::ClockCacheShard() + : head_(0), usage_(0), pinned_usage_(0), strict_capacity_limit_(false) {} + +ClockCacheShard::~ClockCacheShard() { + for (auto& handle : list_) { + uint32_t flags = handle.flags.load(std::memory_order_relaxed); + if (InCache(flags) || CountRefs(flags) > 0) { + (*handle.deleter)(handle.key, handle.value); + delete[] handle.key.data(); + } + } +} + +size_t ClockCacheShard::GetUsage() const { + return usage_.load(std::memory_order_relaxed); +} + +size_t ClockCacheShard::GetPinnedUsage() const { + return pinned_usage_.load(std::memory_order_relaxed); +} + +void ClockCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.Lock(); + } + for (auto& handle : list_) { + // Use relaxed semantics instead of acquire semantics since we are either + // holding mutex, or don't have thread safe requirement. + uint32_t flags = handle.flags.load(std::memory_order_relaxed); + if (InCache(flags)) { + callback(handle.value, handle.charge); + } + } + if (thread_safe) { + mutex_.Unlock(); + } +} + +void ClockCacheShard::RecycleHandle(CacheHandle* handle, + CleanupContext* context) { + mutex_.AssertHeld(); + assert(!InCache(handle->flags) && CountRefs(handle->flags) == 0); + context->to_delete_key.push_back(handle->key.data()); + context->to_delete_value.emplace_back(*handle); + handle->key.clear(); + handle->value = nullptr; + handle->deleter = nullptr; + recycle_.push_back(handle); + usage_.fetch_sub(handle->charge, std::memory_order_relaxed); +} + +void ClockCacheShard::Cleanup(const CleanupContext& context) { + for (const CacheHandle& handle : context.to_delete_value) { + if (handle.deleter) { + (*handle.deleter)(handle.key, handle.value); + } + } + for (const char* key : context.to_delete_key) { + delete[] key; + } +} + +bool ClockCacheShard::Ref(Cache::Handle* h) { + auto handle = reinterpret_cast<CacheHandle*>(h); + // CAS loop to increase reference count. + uint32_t flags = handle->flags.load(std::memory_order_relaxed); + while (InCache(flags)) { + // Use acquire semantics on success, as further operations on the cache + // entry has to be order after reference count is increased. + if (handle->flags.compare_exchange_weak(flags, flags + kOneRef, + std::memory_order_acquire, + std::memory_order_relaxed)) { + if (CountRefs(flags) == 0) { + // No reference count before the operation. + pinned_usage_.fetch_add(handle->charge, std::memory_order_relaxed); + } + return true; + } + } + return false; +} + +bool ClockCacheShard::Unref(CacheHandle* handle, bool set_usage, + CleanupContext* context) { + if (set_usage) { + handle->flags.fetch_or(kUsageBit, std::memory_order_relaxed); + } + // Use acquire-release semantics as previous operations on the cache entry + // has to be order before reference count is decreased, and potential cleanup + // of the entry has to be order after. + uint32_t flags = handle->flags.fetch_sub(kOneRef, std::memory_order_acq_rel); + assert(CountRefs(flags) > 0); + if (CountRefs(flags) == 1) { + // this is the last reference. + pinned_usage_.fetch_sub(handle->charge, std::memory_order_relaxed); + // Cleanup if it is the last reference. + if (!InCache(flags)) { + MutexLock l(&mutex_); + RecycleHandle(handle, context); + } + } + return context->to_delete_value.size(); +} + +bool ClockCacheShard::UnsetInCache(CacheHandle* handle, + CleanupContext* context) { + mutex_.AssertHeld(); + // Use acquire-release semantics as previous operations on the cache entry + // has to be order before reference count is decreased, and potential cleanup + // of the entry has to be order after. + uint32_t flags = + handle->flags.fetch_and(~kInCacheBit, std::memory_order_acq_rel); + // Cleanup if it is the last reference. + if (InCache(flags) && CountRefs(flags) == 0) { + RecycleHandle(handle, context); + } + return context->to_delete_value.size(); +} + +bool ClockCacheShard::TryEvict(CacheHandle* handle, CleanupContext* context) { + mutex_.AssertHeld(); + uint32_t flags = kInCacheBit; + if (handle->flags.compare_exchange_strong(flags, 0, std::memory_order_acquire, + std::memory_order_relaxed)) { + bool erased __attribute__((__unused__)) = + table_.erase(CacheKey(handle->key, handle->hash)); + assert(erased); + RecycleHandle(handle, context); + return true; + } + handle->flags.fetch_and(~kUsageBit, std::memory_order_relaxed); + return false; +} + +bool ClockCacheShard::EvictFromCache(size_t charge, CleanupContext* context) { + size_t usage = usage_.load(std::memory_order_relaxed); + size_t capacity = capacity_.load(std::memory_order_relaxed); + if (usage == 0) { + return charge <= capacity; + } + size_t new_head = head_; + bool second_iteration = false; + while (usage + charge > capacity) { + assert(new_head < list_.size()); + if (TryEvict(&list_[new_head], context)) { + usage = usage_.load(std::memory_order_relaxed); + } + new_head = (new_head + 1 >= list_.size()) ? 0 : new_head + 1; + if (new_head == head_) { + if (second_iteration) { + return false; + } else { + second_iteration = true; + } + } + } + head_ = new_head; + return true; +} + +void ClockCacheShard::SetCapacity(size_t capacity) { + CleanupContext context; + { + MutexLock l(&mutex_); + capacity_.store(capacity, std::memory_order_relaxed); + EvictFromCache(0, &context); + } + Cleanup(context); +} + +void ClockCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + strict_capacity_limit_.store(strict_capacity_limit, + std::memory_order_relaxed); +} + +CacheHandle* ClockCacheShard::Insert( + const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), bool hold_reference, + CleanupContext* context) { + MutexLock l(&mutex_); + bool success = EvictFromCache(charge, context); + bool strict = strict_capacity_limit_.load(std::memory_order_relaxed); + if (!success && (strict || !hold_reference)) { + context->to_delete_key.push_back(key.data()); + if (!hold_reference) { + context->to_delete_value.emplace_back(key, value, deleter); + } + return nullptr; + } + // Grab available handle from recycle bin. If recycle bin is empty, create + // and append new handle to end of circular list. + CacheHandle* handle = nullptr; + if (!recycle_.empty()) { + handle = recycle_.back(); + recycle_.pop_back(); + } else { + list_.emplace_back(); + handle = &list_.back(); + } + // Fill handle. + handle->key = key; + handle->hash = hash; + handle->value = value; + handle->charge = charge; + handle->deleter = deleter; + uint32_t flags = hold_reference ? kInCacheBit + kOneRef : kInCacheBit; + handle->flags.store(flags, std::memory_order_relaxed); + HashTable::accessor accessor; + if (table_.find(accessor, CacheKey(key, hash))) { + CacheHandle* existing_handle = accessor->second; + table_.erase(accessor); + UnsetInCache(existing_handle, context); + } + table_.insert(HashTable::value_type(CacheKey(key, hash), handle)); + if (hold_reference) { + pinned_usage_.fetch_add(charge, std::memory_order_relaxed); + } + usage_.fetch_add(charge, std::memory_order_relaxed); + return handle; +} + +Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** out_handle, + Cache::Priority priority) { + CleanupContext context; + HashTable::accessor accessor; + char* key_data = new char[key.size()]; + memcpy(key_data, key.data(), key.size()); + Slice key_copy(key_data, key.size()); + CacheHandle* handle = Insert(key_copy, hash, value, charge, deleter, + out_handle != nullptr, &context); + Status s; + if (out_handle != nullptr) { + if (handle == nullptr) { + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } else { + *out_handle = reinterpret_cast<Cache::Handle*>(handle); + } + } + Cleanup(context); + return s; +} + +Cache::Handle* ClockCacheShard::Lookup(const Slice& key, uint32_t hash) { + HashTable::const_accessor accessor; + if (!table_.find(accessor, CacheKey(key, hash))) { + return nullptr; + } + CacheHandle* handle = accessor->second; + accessor.release(); + // Ref() could fail if another thread sneak in and evict/erase the cache + // entry before we are able to hold reference. + if (!Ref(reinterpret_cast<Cache::Handle*>(handle))) { + return nullptr; + } + // Double check the key since the handle may now representing another key + // if other threads sneak in, evict/erase the entry and re-used the handle + // for another cache entry. + if (hash != handle->hash || key != handle->key) { + CleanupContext context; + Unref(handle, false, &context); + // It is possible Unref() delete the entry, so we need to cleanup. + Cleanup(context); + return nullptr; + } + return reinterpret_cast<Cache::Handle*>(handle); +} + +bool ClockCacheShard::Release(Cache::Handle* h, bool force_erase) { + CleanupContext context; + CacheHandle* handle = reinterpret_cast<CacheHandle*>(h); + bool erased = Unref(handle, true, &context); + if (force_erase && !erased) { + erased = EraseAndConfirm(handle->key, handle->hash, &context); + } + Cleanup(context); + return erased; +} + +void ClockCacheShard::Erase(const Slice& key, uint32_t hash) { + CleanupContext context; + EraseAndConfirm(key, hash, &context); + Cleanup(context); +} + +bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash, + CleanupContext* context) { + MutexLock l(&mutex_); + HashTable::accessor accessor; + bool erased = false; + if (table_.find(accessor, CacheKey(key, hash))) { + CacheHandle* handle = accessor->second; + table_.erase(accessor); + erased = UnsetInCache(handle, context); + } + return erased; +} + +void ClockCacheShard::EraseUnRefEntries() { + CleanupContext context; + { + MutexLock l(&mutex_); + table_.clear(); + for (auto& handle : list_) { + UnsetInCache(&handle, &context); + } + } + Cleanup(context); +} + +class ClockCache : public ShardedCache { + public: + ClockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) { + int num_shards = 1 << num_shard_bits; + shards_ = new ClockCacheShard[num_shards]; + SetCapacity(capacity); + SetStrictCapacityLimit(strict_capacity_limit); + } + + virtual ~ClockCache() { delete[] shards_; } + + virtual const char* Name() const override { return "ClockCache"; } + + virtual CacheShard* GetShard(int shard) override { + return reinterpret_cast<CacheShard*>(&shards_[shard]); + } + + virtual const CacheShard* GetShard(int shard) const override { + return reinterpret_cast<CacheShard*>(&shards_[shard]); + } + + virtual void* Value(Handle* handle) override { + return reinterpret_cast<const CacheHandle*>(handle)->value; + } + + virtual size_t GetCharge(Handle* handle) const override { + return reinterpret_cast<const CacheHandle*>(handle)->charge; + } + + virtual uint32_t GetHash(Handle* handle) const override { + return reinterpret_cast<const CacheHandle*>(handle)->hash; + } + + virtual void DisownData() override { shards_ = nullptr; } + + private: + ClockCacheShard* shards_; +}; + +} // end anonymous namespace + +std::shared_ptr<Cache> NewClockCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { + if (num_shard_bits < 0) { + num_shard_bits = GetDefaultCacheShardBits(capacity); + } + return std::make_shared<ClockCache>(capacity, num_shard_bits, + strict_capacity_limit); +} + +} // namespace rocksdb + +#endif // SUPPORT_CLOCK_CACHE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/cache/clock_cache.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/cache/clock_cache.h b/thirdparty/rocksdb/cache/clock_cache.h new file mode 100644 index 0000000..1614c0e --- /dev/null +++ b/thirdparty/rocksdb/cache/clock_cache.h @@ -0,0 +1,16 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/cache.h" + +#if defined(TBB) && !defined(ROCKSDB_LITE) +#define SUPPORT_CLOCK_CACHE +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/cache/lru_cache.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/cache/lru_cache.cc b/thirdparty/rocksdb/cache/lru_cache.cc new file mode 100644 index 0000000..f833374 --- /dev/null +++ b/thirdparty/rocksdb/cache/lru_cache.cc @@ -0,0 +1,524 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "cache/lru_cache.h" + +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> +#include <string> + +#include "util/mutexlock.h" + +namespace rocksdb { + +LRUHandleTable::LRUHandleTable() : list_(nullptr), length_(0), elems_(0) { + Resize(); +} + +LRUHandleTable::~LRUHandleTable() { + ApplyToAllCacheEntries([](LRUHandle* h) { + if (h->refs == 1) { + h->Free(); + } + }); + delete[] list_; +} + +LRUHandle* LRUHandleTable::Lookup(const Slice& key, uint32_t hash) { + return *FindPointer(key, hash); +} + +LRUHandle* LRUHandleTable::Insert(LRUHandle* h) { + LRUHandle** ptr = FindPointer(h->key(), h->hash); + LRUHandle* old = *ptr; + h->next_hash = (old == nullptr ? nullptr : old->next_hash); + *ptr = h; + if (old == nullptr) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; +} + +LRUHandle* LRUHandleTable::Remove(const Slice& key, uint32_t hash) { + LRUHandle** ptr = FindPointer(key, hash); + LRUHandle* result = *ptr; + if (result != nullptr) { + *ptr = result->next_hash; + --elems_; + } + return result; +} + +LRUHandle** LRUHandleTable::FindPointer(const Slice& key, uint32_t hash) { + LRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; +} + +void LRUHandleTable::Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + LRUHandle** new_list = new LRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + LRUHandle* h = list_[i]; + while (h != nullptr) { + LRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + LRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + assert(elems_ == count); + delete[] list_; + list_ = new_list; + length_ = new_length; +} + +LRUCacheShard::LRUCacheShard() + : high_pri_pool_usage_(0), usage_(0), lru_usage_(0) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; + lru_low_pri_ = &lru_; +} + +LRUCacheShard::~LRUCacheShard() {} + +bool LRUCacheShard::Unref(LRUHandle* e) { + assert(e->refs > 0); + e->refs--; + return e->refs == 0; +} + +// Call deleter and free + +void LRUCacheShard::EraseUnRefEntries() { + autovector<LRUHandle*> last_reference_list; + { + MutexLock l(&mutex_); + while (lru_.next != &lru_) { + LRUHandle* old = lru_.next; + assert(old->InCache()); + assert(old->refs == + 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + last_reference_list.push_back(old); + } + } + + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void LRUCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + if (thread_safe) { + mutex_.Lock(); + } + table_.ApplyToAllCacheEntries( + [callback](LRUHandle* h) { callback(h->value, h->charge); }); + if (thread_safe) { + mutex_.Unlock(); + } +} + +void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) { + *lru = &lru_; + *lru_low_pri = lru_low_pri_; +} + +size_t LRUCacheShard::TEST_GetLRUSize() { + LRUHandle* lru_handle = lru_.next; + size_t lru_size = 0; + while (lru_handle != &lru_) { + lru_size++; + lru_handle = lru_handle->next; + } + return lru_size; +} + +void LRUCacheShard::LRU_Remove(LRUHandle* e) { + assert(e->next != nullptr); + assert(e->prev != nullptr); + if (lru_low_pri_ == e) { + lru_low_pri_ = e->prev; + } + e->next->prev = e->prev; + e->prev->next = e->next; + e->prev = e->next = nullptr; + lru_usage_ -= e->charge; + if (e->InHighPriPool()) { + assert(high_pri_pool_usage_ >= e->charge); + high_pri_pool_usage_ -= e->charge; + } +} + +void LRUCacheShard::LRU_Insert(LRUHandle* e) { + assert(e->next == nullptr); + assert(e->prev == nullptr); + if (high_pri_pool_ratio_ > 0 && e->IsHighPri()) { + // Inset "e" to head of LRU list. + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(true); + high_pri_pool_usage_ += e->charge; + MaintainPoolSize(); + } else { + // Insert "e" to the head of low-pri pool. Note that when + // high_pri_pool_ratio is 0, head of low-pri pool is also head of LRU list. + e->next = lru_low_pri_->next; + e->prev = lru_low_pri_; + e->prev->next = e; + e->next->prev = e; + e->SetInHighPriPool(false); + lru_low_pri_ = e; + } + lru_usage_ += e->charge; +} + +void LRUCacheShard::MaintainPoolSize() { + while (high_pri_pool_usage_ > high_pri_pool_capacity_) { + // Overflow last entry in high-pri pool to low-pri pool. + lru_low_pri_ = lru_low_pri_->next; + assert(lru_low_pri_ != &lru_); + lru_low_pri_->SetInHighPriPool(false); + high_pri_pool_usage_ -= lru_low_pri_->charge; + } +} + +void LRUCacheShard::EvictFromLRU(size_t charge, + autovector<LRUHandle*>* deleted) { + while (usage_ + charge > capacity_ && lru_.next != &lru_) { + LRUHandle* old = lru_.next; + assert(old->InCache()); + assert(old->refs == 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->SetInCache(false); + Unref(old); + usage_ -= old->charge; + deleted->push_back(old); + } +} + +void* LRUCacheShard::operator new(size_t size) { + return rocksdb::port::cacheline_aligned_alloc(size); +} + +void LRUCacheShard::operator delete(void *memblock) { + rocksdb::port::cacheline_aligned_free(memblock); +} + +void LRUCacheShard::SetCapacity(size_t capacity) { + autovector<LRUHandle*> last_reference_list; + { + MutexLock l(&mutex_); + capacity_ = capacity; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + EvictFromLRU(0, &last_reference_list); + } + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } +} + +void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { + MutexLock l(&mutex_); + strict_capacity_limit_ = strict_capacity_limit; +} + +Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { + MutexLock l(&mutex_); + LRUHandle* e = table_.Lookup(key, hash); + if (e != nullptr) { + assert(e->InCache()); + if (e->refs == 1) { + LRU_Remove(e); + } + e->refs++; + } + return reinterpret_cast<Cache::Handle*>(e); +} + +bool LRUCacheShard::Ref(Cache::Handle* h) { + LRUHandle* handle = reinterpret_cast<LRUHandle*>(h); + MutexLock l(&mutex_); + if (handle->InCache() && handle->refs == 1) { + LRU_Remove(handle); + } + handle->refs++; + return true; +} + +void LRUCacheShard::SetHighPriorityPoolRatio(double high_pri_pool_ratio) { + MutexLock l(&mutex_); + high_pri_pool_ratio_ = high_pri_pool_ratio; + high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; + MaintainPoolSize(); +} + +bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) { + if (handle == nullptr) { + return false; + } + LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); + bool last_reference = false; + { + MutexLock l(&mutex_); + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (e->refs == 1 && e->InCache()) { + // The item is still in cache, and nobody else holds a reference to it + if (usage_ > capacity_ || force_erase) { + // the cache is full + // The LRU list must be empty since the cache is full + assert(!(usage_ > capacity_) || lru_.next == &lru_); + // take this opportunity and remove the item + table_.Remove(e->key(), e->hash); + e->SetInCache(false); + Unref(e); + usage_ -= e->charge; + last_reference = true; + } else { + // put the item on the list to be potentially freed + LRU_Insert(e); + } + } + } + + // free outside of mutex + if (last_reference) { + e->Free(); + } + return last_reference; +} + +Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, Cache::Priority priority) { + // Allocate the memory here outside of the mutex + // If the cache is full, we'll have to release it + // It shouldn't happen very often though. + LRUHandle* e = reinterpret_cast<LRUHandle*>( + new char[sizeof(LRUHandle) - 1 + key.size()]); + Status s; + autovector<LRUHandle*> last_reference_list; + + e->value = value; + e->deleter = deleter; + e->charge = charge; + e->key_length = key.size(); + e->hash = hash; + e->refs = (handle == nullptr + ? 1 + : 2); // One from LRUCache, one for the returned handle + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + memcpy(e->key_data, key.data(), key.size()); + + { + MutexLock l(&mutex_); + + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(charge, &last_reference_list); + + if (usage_ - lru_usage_ + charge > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + last_reference_list.push_back(e); + } else { + delete[] reinterpret_cast<char*>(e); + *handle = nullptr; + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // insert into the cache + // note that the cache might get larger than its capacity if not enough + // space was freed + LRUHandle* old = table_.Insert(e); + usage_ += e->charge; + if (old != nullptr) { + old->SetInCache(false); + if (Unref(old)) { + usage_ -= old->charge; + // old is on LRU because it's in cache and its reference count + // was just 1 (Unref returned 0) + LRU_Remove(old); + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + *handle = reinterpret_cast<Cache::Handle*>(e); + } + s = Status::OK(); + } + } + + // we free the entries here outside of mutex for + // performance reasons + for (auto entry : last_reference_list) { + entry->Free(); + } + + return s; +} + +void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { + LRUHandle* e; + bool last_reference = false; + { + MutexLock l(&mutex_); + e = table_.Remove(key, hash); + if (e != nullptr) { + last_reference = Unref(e); + if (last_reference) { + usage_ -= e->charge; + } + if (last_reference && e->InCache()) { + LRU_Remove(e); + } + e->SetInCache(false); + } + } + + // mutex not held here + // last_reference will only be true if e != nullptr + if (last_reference) { + e->Free(); + } +} + +size_t LRUCacheShard::GetUsage() const { + MutexLock l(&mutex_); + return usage_; +} + +size_t LRUCacheShard::GetPinnedUsage() const { + MutexLock l(&mutex_); + assert(usage_ >= lru_usage_); + return usage_ - lru_usage_; +} + +std::string LRUCacheShard::GetPrintableOptions() const { + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + MutexLock l(&mutex_); + snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n", + high_pri_pool_ratio_); + } + return std::string(buffer); +} + +LRUCache::LRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit, double high_pri_pool_ratio) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) { + num_shards_ = 1 << num_shard_bits; +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable: 4316) // We've validated the alignment with the new operators +#endif + shards_ = new LRUCacheShard[num_shards_]; +#if defined(_MSC_VER) +#pragma warning(pop) +#endif + SetCapacity(capacity); + SetStrictCapacityLimit(strict_capacity_limit); + for (int i = 0; i < num_shards_; i++) { + shards_[i].SetHighPriorityPoolRatio(high_pri_pool_ratio); + } +} + +LRUCache::~LRUCache() { delete[] shards_; } + +CacheShard* LRUCache::GetShard(int shard) { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +const CacheShard* LRUCache::GetShard(int shard) const { + return reinterpret_cast<CacheShard*>(&shards_[shard]); +} + +void* LRUCache::Value(Handle* handle) { + return reinterpret_cast<const LRUHandle*>(handle)->value; +} + +size_t LRUCache::GetCharge(Handle* handle) const { + return reinterpret_cast<const LRUHandle*>(handle)->charge; +} + +uint32_t LRUCache::GetHash(Handle* handle) const { + return reinterpret_cast<const LRUHandle*>(handle)->hash; +} + +void LRUCache::DisownData() { shards_ = nullptr; } + +size_t LRUCache::TEST_GetLRUSize() { + size_t lru_size_of_all_shards = 0; + for (int i = 0; i < num_shards_; i++) { + lru_size_of_all_shards += shards_[i].TEST_GetLRUSize(); + } + return lru_size_of_all_shards; +} + +std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit, + double high_pri_pool_ratio) { + if (num_shard_bits >= 20) { + return nullptr; // the cache cannot be sharded into too many fine pieces + } + if (high_pri_pool_ratio < 0.0 || high_pri_pool_ratio > 1.0) { + // invalid high_pri_pool_ratio + return nullptr; + } + if (num_shard_bits < 0) { + num_shard_bits = GetDefaultCacheShardBits(capacity); + } + return std::make_shared<LRUCache>(capacity, num_shard_bits, + strict_capacity_limit, high_pri_pool_ratio); +} + +} // namespace rocksdb
