This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 9ff37361 feat(c/driver/postgresql): INSERT benchmark for postgres
(#1189)
9ff37361 is described below
commit 9ff373611f7f95f6b93157ea3973dbc74559f3ee
Author: William Ayd <[email protected]>
AuthorDate: Fri Oct 13 11:57:42 2023 -0400
feat(c/driver/postgresql): INSERT benchmark for postgres (#1189)
Pretty hacked together but I think is a workable foundation
---
.pre-commit-config.yaml | 2 +-
c/CMakeLists.txt | 6 +
c/driver/postgresql/CMakeLists.txt | 17 ++
c/driver/postgresql/postgresql_benchmark.cc | 176 +++++++++++++++++++++
ci/build_support/run-test.sh | 235 ++++++++++++++++++++++++++++
5 files changed, 435 insertions(+), 1 deletion(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 8bed1221..9b96d955 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -56,7 +56,7 @@ repos:
- id: cpplint
args:
# From Arrow's config
- -
"--filter=-whitespace/comments,-whitespace/indent,-readability/braces,-readability/casting,-readability/todo,-readability/alt_tokens,-build/header_guard,-build/c++11,-build/include_order,-build/include_subdir"
+ -
"--filter=-whitespace/comments,-whitespace/indent,-readability/braces,-readability/casting,-readability/todo,-readability/alt_tokens,-build/header_guard,-build/c++11,-build/include_order,-build/include_subdir,-runtime/references"
- "--linelength=90"
- "--verbose=2"
- repo: https://github.com/golangci/golangci-lint
diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 4115da7d..9b1ec5ba 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -33,6 +33,12 @@ if(ADBC_BUILD_TESTS)
add_subdirectory(validation)
endif()
+if(ADBC_BUILD_BENCHMARKS)
+ add_custom_target(all-benchmarks)
+ add_custom_target(benchmark ctest -L benchmark)
+ add_dependencies(benchmark all-benchmarks)
+endif()
+
if(ADBC_INTEGRATION_DUCKDB)
set(ADBC_DRIVER_MANAGER ON)
endif()
diff --git a/c/driver/postgresql/CMakeLists.txt
b/c/driver/postgresql/CMakeLists.txt
index bd1a68e2..b98e6ea3 100644
--- a/c/driver/postgresql/CMakeLists.txt
+++ b/c/driver/postgresql/CMakeLists.txt
@@ -92,3 +92,20 @@ if(ADBC_BUILD_TESTS)
${REPOSITORY_ROOT}/c/driver)
adbc_configure_target(adbc-driver-postgresql-test)
endif()
+
+if(ADBC_BUILD_BENCHMARKS)
+ find_package(benchmark REQUIRED)
+ # TODO: should add_benchmark be linking benchmark::benchmark for us?
+ add_benchmark(postgresql_benchmark
+ EXTRA_LINK_LIBS
+ adbc_driver_common
+ adbc_validation
+ nanoarrow
+ ${TEST_LINK_LIBS}
+ benchmark::benchmark)
+ # add_benchmark replaces _ with - when creating target
+ target_include_directories(postgresql-benchmark
+ PRIVATE ${REPOSITORY_ROOT} ${REPOSITORY_ROOT}/c/
+ ${REPOSITORY_ROOT}/c/vendor
+ ${REPOSITORY_ROOT}/c/driver)
+endif()
diff --git a/c/driver/postgresql/postgresql_benchmark.cc
b/c/driver/postgresql/postgresql_benchmark.cc
new file mode 100644
index 00000000..85a4ae7c
--- /dev/null
+++ b/c/driver/postgresql/postgresql_benchmark.cc
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include <benchmark/benchmark.h>
+#include <nanoarrow/nanoarrow.hpp>
+
+#include "adbc.h"
+#include "validation/adbc_validation_util.h"
+
+static void BM_PostgresqlExecute(benchmark::State& state) {
+ const char* uri = std::getenv("ADBC_POSTGRESQL_TEST_URI");
+ if (!uri) {
+ state.SkipWithError("ADBC_POSTGRESQL_TEST_URI not set!");
+ }
+ adbc_validation::Handle<struct AdbcDatabase> database;
+ struct AdbcError error;
+
+ if (AdbcDatabaseNew(&database.value, &error) != ADBC_STATUS_OK) {
+ state.SkipWithError("AdbcDatabaseNew call failed");
+ }
+
+ if (AdbcDatabaseSetOption(&database.value, "uri", uri, &error) !=
ADBC_STATUS_OK) {
+ state.SkipWithError("Could not set database uri option");
+ }
+
+ if (AdbcDatabaseInit(&database.value, &error) != ADBC_STATUS_OK) {
+ state.SkipWithError("AdbcDatabaseInit failed");
+ }
+
+ adbc_validation::Handle<struct AdbcConnection> connection;
+ if (AdbcConnectionNew(&connection.value, &error) != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not create connection object");
+ }
+
+ if (AdbcConnectionInit(&connection.value, &database.value, &error) !=
ADBC_STATUS_OK) {
+ state.SkipWithError("Could not connect to database");
+ }
+
+ adbc_validation::Handle<struct AdbcStatement> statement;
+ if (AdbcStatementNew(&connection.value, &statement.value, &error) !=
ADBC_STATUS_OK) {
+ state.SkipWithError("Could not create statement object");
+ }
+
+ const char* drop_query = "DROP TABLE IF EXISTS
adbc_postgresql_ingest_benchmark";
+ if (AdbcStatementSetSqlQuery(&statement.value, drop_query, &error)
+ != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not set DROP TABLE SQL query");
+ }
+
+ if (AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error)
+ != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not execute DROP TABLE SQL query");
+ }
+
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+
+ if (adbc_validation::MakeSchema(&schema.value, {
+ {"bools", NANOARROW_TYPE_BOOL},
+ {"int16s", NANOARROW_TYPE_INT16},
+ {"int32s", NANOARROW_TYPE_INT32},
+ {"int64s", NANOARROW_TYPE_INT64},
+ {"floats", NANOARROW_TYPE_FLOAT},
+ {"doubles", NANOARROW_TYPE_DOUBLE},
+ }) != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not create benchmark schema");
+ }
+
+ if (ArrowArrayInitFromSchema(&array.value, &schema.value, &na_error) !=
NANOARROW_OK) {
+ state.SkipWithError("Could not init array from schema");
+ }
+
+ if (ArrowArrayStartAppending(&array.value) != NANOARROW_OK) {
+ state.SkipWithError("Could not start appending to array");
+ }
+
+ const size_t n_zeros = 1000;
+ const size_t n_ones = 1000;
+
+ for (size_t i = 0; i < n_zeros; i++) {
+ // assumes fixed size primitive layouts for now
+ ArrowBufferAppendInt8(ArrowArrayBuffer(array.value.children[0], 1), 0);
+ ArrowBufferAppendInt16(ArrowArrayBuffer(array.value.children[1], 1), 0);
+ ArrowBufferAppendInt32(ArrowArrayBuffer(array.value.children[2], 1), 0);
+ ArrowBufferAppendInt64(ArrowArrayBuffer(array.value.children[3], 1), 0);
+ ArrowBufferAppendFloat(ArrowArrayBuffer(array.value.children[4], 1), 0.0);
+ ArrowBufferAppendDouble(ArrowArrayBuffer(array.value.children[5], 1), 0.0);
+ }
+ for (size_t i = 0; i < n_ones; i++) {
+ // assumes fixed size primitive layouts for now
+ ArrowBufferAppendInt8(ArrowArrayBuffer(array.value.children[0], 1), 1);
+ ArrowBufferAppendInt16(ArrowArrayBuffer(array.value.children[1], 1), 1);
+ ArrowBufferAppendInt32(ArrowArrayBuffer(array.value.children[2], 1), 1);
+ ArrowBufferAppendInt64(ArrowArrayBuffer(array.value.children[3], 1), 1);
+ ArrowBufferAppendFloat(ArrowArrayBuffer(array.value.children[4], 1), 1.0);
+ ArrowBufferAppendDouble(ArrowArrayBuffer(array.value.children[5], 1), 1.0);
+ }
+
+ for (int64_t i = 0; i < array.value.n_children; i++) {
+ array.value.children[i]->length = n_zeros + n_ones;
+ }
+ array.value.length = n_zeros + n_ones;
+
+ if (ArrowArrayFinishBuildingDefault(&array.value, &na_error) !=
NANOARROW_OK) {
+ state.SkipWithError("Could not finish array");
+ }
+
+ const char* create_query =
+ "CREATE TABLE adbc_postgresql_ingest_benchmark (bools BOOLEAN, int16s
SMALLINT, "
+ "int32s INTEGER, int64s BIGINT, floats REAL, doubles DOUBLE PRECISION)";
+
+ if (AdbcStatementSetSqlQuery(&statement.value, create_query, &error)
+ != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not set CREATE TABLE SQL query");
+ }
+
+ if (AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error)
+ != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not execute CREATE TABLE SQL query");
+ }
+
+ adbc_validation::Handle<struct AdbcStatement> insert_stmt;
+ if (AdbcStatementNew(&connection.value, &insert_stmt.value, &error) !=
ADBC_STATUS_OK) {
+ state.SkipWithError("Could not create INSERT statement object");
+ }
+
+ if (AdbcStatementSetOption(&insert_stmt.value,
+ ADBC_INGEST_OPTION_TARGET_TABLE,
+ "adbc_postgresql_ingest_benchmark",
+ &error) != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not set bulk_ingest statement option");
+ }
+
+ if (AdbcStatementSetOption(&insert_stmt.value,
+ ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND,
+ &error) != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not set bulk_ingest append option");
+ }
+
+ for (auto _ : state) {
+ // Bind release the array, so if this actually loops you will get errors
+ // memory leaks
+ AdbcStatementBind(&insert_stmt.value, &array.value, &schema.value, &error);
+ AdbcStatementExecuteQuery(&insert_stmt.value, nullptr, nullptr, &error);
+ }
+
+ if (AdbcStatementSetSqlQuery(&statement.value, drop_query, &error)
+ != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not set DROP TABLE SQL query");
+ }
+
+ if (AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error)
+ != ADBC_STATUS_OK) {
+ state.SkipWithError("Could not execute DROP TABLE SQL query");
+ }
+}
+
+BENCHMARK(BM_PostgresqlExecute);
+BENCHMARK_MAIN();
diff --git a/ci/build_support/run-test.sh b/ci/build_support/run-test.sh
new file mode 100755
index 00000000..8e42438a
--- /dev/null
+++ b/ci/build_support/run-test.sh
@@ -0,0 +1,235 @@
+#!/usr/bin/env bash
+# Copyright 2014 Cloudera, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Script which wraps running a test and redirects its output to a
+# test log directory.
+#
+# Arguments:
+# $1 - Base path for logs/artifacts.
+# $2 - type of test (e.g. test or benchmark)
+# $3 - path to executable
+# $ARGN - arguments for executable
+#
+
+OUTPUT_ROOT=$1
+shift
+ROOT=$(cd $(dirname $BASH_SOURCE)/..; pwd)
+
+TEST_LOGDIR=$OUTPUT_ROOT/build/$1-logs
+mkdir -p $TEST_LOGDIR
+
+RUN_TYPE=$1
+shift
+TEST_DEBUGDIR=$OUTPUT_ROOT/build/$RUN_TYPE-debug
+mkdir -p $TEST_DEBUGDIR
+
+TEST_DIRNAME=$(cd $(dirname $1); pwd)
+TEST_FILENAME=$(basename $1)
+shift
+TEST_EXECUTABLE="$TEST_DIRNAME/$TEST_FILENAME"
+TEST_NAME=$(echo $TEST_FILENAME | sed -E -e 's/\..+$//') # Remove path and
extension (if any).
+
+# We run each test in its own subdir to avoid core file related races.
+TEST_WORKDIR=$OUTPUT_ROOT/build/test-work/$TEST_NAME
+mkdir -p $TEST_WORKDIR
+pushd $TEST_WORKDIR >/dev/null || exit 1
+rm -f *
+
+set -o pipefail
+
+LOGFILE=$TEST_LOGDIR/$TEST_NAME.txt
+XMLFILE=$TEST_LOGDIR/$TEST_NAME.xml
+
+TEST_EXECUTION_ATTEMPTS=1
+
+# Remove both the uncompressed output, so the developer doesn't accidentally
get confused
+# and read output from a prior test run.
+rm -f $LOGFILE $LOGFILE.gz
+
+pipe_cmd=cat
+
+function setup_sanitizers() {
+ # Sets environment variables for different sanitizers (it configures how)
the run_tests. Function works.
+
+ # Configure TSAN (ignored if this isn't a TSAN build).
+ #
+ TSAN_OPTIONS="$TSAN_OPTIONS
suppressions=$ROOT/build-support/tsan-suppressions.txt"
+ TSAN_OPTIONS="$TSAN_OPTIONS history_size=7"
+ # Some tests deliberately fail allocating memory
+ TSAN_OPTIONS="$TSAN_OPTIONS allocator_may_return_null=1"
+ export TSAN_OPTIONS
+
+ UBSAN_OPTIONS="$UBSAN_OPTIONS print_stacktrace=1"
+ UBSAN_OPTIONS="$UBSAN_OPTIONS
suppressions=$ROOT/build-support/ubsan-suppressions.txt"
+ export UBSAN_OPTIONS
+
+ # Enable leak detection even under LLVM 3.4, where it was disabled by
default.
+ # This flag only takes effect when running an ASAN build.
+ # ASAN_OPTIONS="$ASAN_OPTIONS detect_leaks=1"
+ # export ASAN_OPTIONS
+
+ # Set up suppressions for LeakSanitizer
+ LSAN_OPTIONS="$LSAN_OPTIONS
suppressions=$ROOT/build-support/lsan-suppressions.txt"
+ export LSAN_OPTIONS
+}
+
+function run_test() {
+ # Run gtest style tests with sanitizers if they are setup appropriately.
+
+ # gtest won't overwrite old junit test files, resulting in a build failure
+ # even when retries are successful.
+ rm -f $XMLFILE
+
+ $TEST_EXECUTABLE "$@" > $LOGFILE.raw 2>&1
+ STATUS=$?
+ cat $LOGFILE.raw \
+ | ${PYTHON:-python} $ROOT/build-support/asan_symbolize.py \
+ | ${CXXFILT:-c++filt} \
+ | $pipe_cmd 2>&1 | tee $LOGFILE
+ rm -f $LOGFILE.raw
+
+ # TSAN doesn't always exit with a non-zero exit code due to a bug:
+ # mutex errors don't get reported through the normal error reporting
infrastructure.
+ # So we make sure to detect this and exit 1.
+ #
+ # Additionally, certain types of failures won't show up in the standard JUnit
+ # XML output from gtest. We assume that gtest knows better than us and our
+ # regexes in most cases, but for certain errors we delete the resulting xml
+ # file and let our own post-processing step regenerate it.
+ if grep -E -q "ThreadSanitizer|Leak check.*detected leaks" $LOGFILE ; then
+ echo ThreadSanitizer or leak check failures in $LOGFILE
+ STATUS=1
+ rm -f $XMLFILE
+ fi
+}
+
+function print_coredumps() {
+ # The script expects core files relative to the build directory with unique
+ # names per test executable because of the parallel running. So the corefile
+ # patterns must be set with prefix `core.{test-executable}*`:
+ #
+ # In case of macOS:
+ # sudo sysctl -w kern.corefile=core.%N.%P
+ # On Linux:
+ # sudo sysctl -w kernel.core_pattern=core.%e.%p
+ #
+ # and the ulimit must be increased:
+ # ulimit -c unlimited
+
+ # filename is truncated to the first 15 characters in case of linux, so limit
+ # the pattern for the first 15 characters
+ FILENAME=$(basename "${TEST_EXECUTABLE}")
+ FILENAME=$(echo ${FILENAME} | cut -c-15)
+ PATTERN="^core\.${FILENAME}"
+
+ COREFILES=$(ls | grep $PATTERN)
+ if [ -n "$COREFILES" ]; then
+ echo "Found core dump, printing backtrace:"
+
+ for COREFILE in $COREFILES; do
+ # Print backtrace
+ if [ "$(uname)" == "Darwin" ]; then
+ lldb -c "${COREFILE}" --batch --one-line "thread backtrace all -e true"
+ else
+ gdb -c "${COREFILE}" $TEST_EXECUTABLE -ex "thread apply all bt" -ex
"set pagination 0" -batch
+ fi
+ # Remove the coredump, regenerate it via running the test case directly
+ rm "${COREFILE}"
+ done
+ fi
+}
+
+function post_process_tests() {
+ # If we have a LeakSanitizer report, and XML reporting is configured, add a
new test
+ # case result to the XML file for the leak report. Otherwise Jenkins won't
show
+ # us which tests had LSAN errors.
+ if grep -E -q "ERROR: LeakSanitizer: detected memory leaks" $LOGFILE ; then
+ echo Test had memory leaks. Editing XML
+ sed -i.bak -e '/<\/testsuite>/ i\
+ <testcase name="LeakSanitizer" status="run" classname="LSAN">\
+ <failure message="LeakSanitizer failed" type="">\
+ See txt log file for details\
+ </failure>\
+ </testcase>' \
+ $XMLFILE
+ mv $XMLFILE.bak $XMLFILE
+ fi
+}
+
+function run_other() {
+ # Generic run function for test like executables that aren't actually gtest
+ $TEST_EXECUTABLE "$@" 2>&1 | $pipe_cmd > $LOGFILE
+ STATUS=$?
+}
+
+if [ $RUN_TYPE = "test" ]; then
+ setup_sanitizers
+fi
+
+# Run the actual test.
+for ATTEMPT_NUMBER in $(seq 1 $TEST_EXECUTION_ATTEMPTS) ; do
+ if [ $ATTEMPT_NUMBER -lt $TEST_EXECUTION_ATTEMPTS ]; then
+ # If the test fails, the test output may or may not be left behind,
+ # depending on whether the test cleaned up or exited immediately. Either
+ # way we need to clean it up. We do this by comparing the data directory
+ # contents before and after the test runs, and deleting anything new.
+ #
+ # The comm program requires that its two inputs be sorted.
+ TEST_TMPDIR_BEFORE=$(find $TEST_TMPDIR -maxdepth 1 -type d | sort)
+ fi
+
+ if [ $ATTEMPT_NUMBER -lt $TEST_EXECUTION_ATTEMPTS ]; then
+ # Now delete any new test output.
+ TEST_TMPDIR_AFTER=$(find $TEST_TMPDIR -maxdepth 1 -type d | sort)
+ DIFF=$(comm -13 <(echo "$TEST_TMPDIR_BEFORE") \
+ <(echo "$TEST_TMPDIR_AFTER"))
+ for DIR in $DIFF; do
+ # Multiple tests may be running concurrently. To avoid deleting the
+ # wrong directories, constrain to only directories beginning with the
+ # test name.
+ #
+ # This may delete old test directories belonging to this test, but
+ # that's not typically a concern when rerunning flaky tests.
+ if [[ $DIR =~ ^$TEST_TMPDIR/$TEST_NAME ]]; then
+ echo Deleting leftover flaky test directory "$DIR"
+ rm -Rf "$DIR"
+ fi
+ done
+ fi
+ echo "Running $TEST_NAME, redirecting output into $LOGFILE" \
+ "(attempt ${ATTEMPT_NUMBER}/$TEST_EXECUTION_ATTEMPTS)"
+ if [ $RUN_TYPE = "test" ]; then
+ run_test $*
+ else
+ run_other $*
+ fi
+ if [ "$STATUS" -eq "0" ]; then
+ break
+ elif [ "$ATTEMPT_NUMBER" -lt "$TEST_EXECUTION_ATTEMPTS" ]; then
+ echo Test failed attempt number $ATTEMPT_NUMBER
+ echo Will retry...
+ fi
+done
+
+if [ $RUN_TYPE = "test" ]; then
+ post_process_tests
+fi
+
+print_coredumps
+
+popd
+rm -Rf $TEST_WORKDIR
+
+exit $STATUS