This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git
The following commit(s) were added to refs/heads/main by this push:
new a5112a4 Add support for concurrent session (#37)
a5112a4 is described below
commit a5112a4f23793e04fbb598e14bbb06846cd39212
Author: Sutou Kouhei <[email protected]>
AuthorDate: Fri May 5 12:57:43 2023 +0900
Add support for concurrent session (#37)
Closes GH-20
---
benchmark/integer/select-adbc-flight-sql.rb | 6 +-
benchmark/integer/select-adbc-postgresql.rb | 4 +-
benchmark/integer/select.rb | 7 +-
src/afs.cc | 906 ++++++++++++++++++----------
test/test-flight-sql.rb | 23 +-
5 files changed, 628 insertions(+), 318 deletions(-)
diff --git a/benchmark/integer/select-adbc-flight-sql.rb
b/benchmark/integer/select-adbc-flight-sql.rb
index 6155ca2..7b8ac3b 100755
--- a/benchmark/integer/select-adbc-flight-sql.rb
+++ b/benchmark/integer/select-adbc-flight-sql.rb
@@ -24,14 +24,16 @@ require "adbc"
options = {
"driver" => "adbc_driver_flightsql",
"uri" => "grpc://127.0.0.1:15432",
+ "username" => ENV["PGUSER"] || ENV["USER"],
+ "password" => ENV["PGPASSWORD"] || "",
"adbc.flight.sql.rpc.call_header.x-flight-sql-database" => "afs_benchmark",
}
ADBC::Database.open(**options) do |database|
database.connect do |connection|
connection.open_statement do |statement|
before = Time.now
- table, n_rows_affected = statement.query("SELECT * FROM data")
- # p table
+ _table, _n_rows_affected = statement.query("SELECT * FROM data")
+ # p _table
puts("%.3fsec" % (Time.now - before))
end
end
diff --git a/benchmark/integer/select-adbc-postgresql.rb
b/benchmark/integer/select-adbc-postgresql.rb
index 1fba137..f78f1c8 100755
--- a/benchmark/integer/select-adbc-postgresql.rb
+++ b/benchmark/integer/select-adbc-postgresql.rb
@@ -29,8 +29,8 @@ ADBC::Database.open(**options) do |database|
database.connect do |connection|
connection.open_statement do |statement|
before = Time.now
- table, n_rows_affected = statement.query("SELECT * FROM data")
- # p table
+ _table, _n_rows_affected = statement.query("SELECT * FROM data")
+ # p _table
puts("%.3fsec" % (Time.now - before))
end
end
diff --git a/benchmark/integer/select.rb b/benchmark/integer/select.rb
index bc2898c..f676e42 100755
--- a/benchmark/integer/select.rb
+++ b/benchmark/integer/select.rb
@@ -24,12 +24,15 @@ require "arrow-flight-sql"
call_options = ArrowFlight::CallOptions.new
call_options.add_header("x-flight-sql-database", "afs_benchmark")
client = ArrowFlight::Client.new("grpc://127.0.0.1:15432")
+client.authenticate_basic(ENV["PGUSER"] || ENV["USER"],
+ ENV["PGPASSWORD"] || "",
+ call_options)
sql_client = ArrowFlightSQL::Client.new(client)
before = Time.now
info = sql_client.execute("SELECT * FROM data", call_options)
endpoint = info.endpoints.first
reader = sql_client.do_get(endpoint.ticket, call_options)
-table = reader.read_all
-# p table
+_table = reader.read_all
+# p _table
puts("%.3fsec" % (Time.now - before))
diff --git a/src/afs.cc b/src/afs.cc
index 12798c8..b33546f 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -22,6 +22,7 @@ extern "C"
#include <access/xact.h>
#include <executor/spi.h>
#include <fmgr.h>
+#include <lib/dshash.h>
#include <miscadmin.h>
#include <postmaster/bgworker.h>
#include <storage/ipc.h>
@@ -48,7 +49,9 @@ extern "C"
#include <arrow/table_builder.h>
#include <arrow/util/base64.h>
+#include <cinttypes>
#include <condition_variable>
+#include <random>
#include <sstream>
#ifdef __GNUC__
@@ -126,12 +129,6 @@ afs_shmem_request_hook(void)
RequestNamedLWLockTranche(LWLockTrancheName, 1);
}
-struct ConnectData {
- dsa_pointer databaseName;
- dsa_pointer userName;
- dsa_pointer password;
-};
-
struct SharedRingBufferData {
dsa_pointer pointer;
size_t total;
@@ -149,6 +146,13 @@ class SharedRingBuffer {
data->tail = 0;
}
+ static void free_data(SharedRingBufferData* data, dsa_area* area)
+ {
+ if (data->pointer != InvalidDsaPointer)
+ dsa_free(area, data->pointer);
+ initialize_data(data);
+ }
+
SharedRingBuffer(SharedRingBufferData* data, dsa_area* area)
: data_(data), area_(area)
{
@@ -162,11 +166,7 @@ class SharedRingBuffer {
data_->tail = 0;
}
- void free()
- {
- dsa_free(area_, data_->pointer);
- initialize_data(data_);
- }
+ void free() { free_data(data_, area_); }
size_t size() const
{
@@ -302,18 +302,47 @@ class SharedRingBuffer {
}
};
-struct ExecuteData {
+struct SessionData {
+ uint64_t id;
+ arrow::Status status;
+ pid_t executorPID;
+ bool initialized;
+ dsa_pointer databaseName;
+ dsa_pointer userName;
+ dsa_pointer password;
dsa_pointer query;
SharedRingBufferData bufferData;
};
+class SessionReleaser {
+ public:
+ explicit SessionReleaser(dshash_table* sessions, SessionData* data)
+ : sessions_(sessions), data_(data)
+ {
+ }
+
+ ~SessionReleaser() { dshash_release_lock(sessions_, data_); }
+
+ private:
+ dshash_table* sessions_;
+ SessionData* data_;
+};
+
+static dshash_parameters SessionsParams = {
+ sizeof(uint32_t),
+ sizeof(SessionData),
+ dshash_memcmp,
+ dshash_memhash,
+ 0, // Set later because this is determined dynamically.
+};
+
struct SharedData {
+ int trancheID;
dsa_handle handle;
- pid_t executorPID;
+ int sessionsTrancheID;
+ dshash_table_handle sessionsHandle;
pid_t serverPID;
pid_t mainPID;
- ConnectData connectData;
- ExecuteData executeData;
};
class Processor {
@@ -328,84 +357,18 @@ class Processor {
{
}
- virtual ~Processor() { dsa_detach(area_); }
-
- const char* tag() { return tag_; }
-
- SharedRingBuffer create_shared_ring_buffer()
+ virtual ~Processor()
{
- return SharedRingBuffer(&(sharedData_->executeData.bufferData),
area_);
+ if (area_)
+ dsa_detach(area_);
}
+ const char* tag() { return tag_; }
+
void lock_acquire(LWLockMode mode) { LWLockAcquire(lock_,
LW_EXCLUSIVE); }
void lock_release() { LWLockRelease(lock_); }
- void wait_executor_written(SharedRingBuffer* buffer)
- {
- if (ARROW_PREDICT_FALSE(sharedData_->executorPID == InvalidPid))
- {
- ereport(ERROR,
- errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("%s: %s: executor isn't alive", Tag,
tag_));
- }
-
- P("%s: %s: %s: kill executor: %d", Tag, tag_, AFS_FUNC,
sharedData_->executorPID);
- kill(sharedData_->executorPID, SIGUSR1);
- auto size = buffer->size();
- std::unique_lock<std::mutex> lock(mutex_);
- conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait: write: %d:%d",
- Tag,
- tag_,
- AFS_FUNC,
- buffer->size(),
- size);
- return buffer->size() != size;
- });
- }
-
- void wait_server_read(SharedRingBuffer* buffer)
- {
- if (ARROW_PREDICT_FALSE(sharedData_->serverPID == InvalidPid))
- {
- ereport(ERROR,
- errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("%s: %s: server isn't alive", Tag,
tag_));
- }
-
- P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC,
sharedData_->serverPID);
- kill(sharedData_->serverPID, SIGUSR1);
- auto restSize = buffer->rest_size();
- while (true)
- {
- int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
- WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
- if (GotSIGTERM)
- {
- break;
- }
- ResetLatch(MyLatch);
-
- if (GotSIGUSR1)
- {
- GotSIGUSR1 = false;
- P("%s: %s: %s: wait: read: %d:%d",
- Tag,
- tag_,
- AFS_FUNC,
- buffer->rest_size(),
- restSize);
- if (buffer->rest_size() != restSize)
- {
- break;
- }
- }
-
- CHECK_FOR_INTERRUPTS();
- }
- }
-
void signaled()
{
P("%s: %s: signaled: before", Tag, tag_);
@@ -422,10 +385,15 @@ class Processor {
std::condition_variable conditionVariable_;
};
+class Proxy;
class SharedRingBufferInputStream : public arrow::io::InputStream {
public:
- SharedRingBufferInputStream(Processor* processor)
- : arrow::io::InputStream(), processor_(processor),
position_(0), is_open_(true)
+ SharedRingBufferInputStream(Proxy* proxy, SessionData* session)
+ : arrow::io::InputStream(),
+ proxy_(proxy),
+ session_(session),
+ position_(0),
+ is_open_(true)
{
}
@@ -439,33 +407,7 @@ class SharedRingBufferInputStream : public
arrow::io::InputStream {
arrow::Result<int64_t> Tell() const override { return position_; }
- arrow::Result<int64_t> Read(int64_t nBytes, void* out) override
- {
- if (ARROW_PREDICT_FALSE(!is_open_))
- {
- return arrow::Status::IOError(std::string(Tag) + ": " +
processor_->tag() +
- ":
SharedRingBufferInputStream is closed");
- }
- auto buffer =
std::move(processor_->create_shared_ring_buffer());
- size_t rest = static_cast<size_t>(nBytes);
- while (true)
- {
- processor_->lock_acquire(LW_EXCLUSIVE);
- auto readBytes = buffer.read(rest, out);
- processor_->lock_release();
-
- position_ += readBytes;
- rest -= readBytes;
- out = static_cast<uint8_t*>(out) + readBytes;
- if (ARROW_PREDICT_TRUE(rest == 0))
- {
- break;
- }
-
- processor_->wait_executor_written(&buffer);
- }
- return nBytes;
- }
+ arrow::Result<int64_t> Read(int64_t nBytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nBytes)
override
{
@@ -477,15 +419,17 @@ class SharedRingBufferInputStream : public
arrow::io::InputStream {
}
private:
- Processor* processor_;
+ Proxy* proxy_;
+ SessionData* session_;
int64_t position_;
bool is_open_;
};
+class Executor;
class SharedRingBufferOutputStream : public arrow::io::OutputStream {
public:
- SharedRingBufferOutputStream(Processor* processor)
- : arrow::io::OutputStream(), processor_(processor),
position_(0), is_open_(true)
+ SharedRingBufferOutputStream(Executor* executor)
+ : arrow::io::OutputStream(), executor_(executor), position_(0),
is_open_(true)
{
}
@@ -499,54 +443,24 @@ class SharedRingBufferOutputStream : public
arrow::io::OutputStream {
arrow::Result<int64_t> Tell() const override { return position_; }
- arrow::Status Write(const void* data, int64_t nBytes) override
- {
- if (ARROW_PREDICT_FALSE(!is_open_))
- {
- return arrow::Status::IOError(std::string(Tag) + ": " +
processor_->tag() +
- ":
SharedRingBufferOutputStream is closed");
- }
- if (ARROW_PREDICT_TRUE(nBytes > 0))
- {
- auto buffer =
std::move(processor_->create_shared_ring_buffer());
- size_t rest = static_cast<size_t>(nBytes);
- while (true)
- {
- processor_->lock_acquire(LW_EXCLUSIVE);
- auto writtenSize = buffer.write(data, rest);
- processor_->lock_release();
-
- position_ += writtenSize;
- rest -= writtenSize;
- data = static_cast<const uint8_t*>(data) +
writtenSize;
-
- if (ARROW_PREDICT_TRUE(rest == 0))
- {
- break;
- }
-
- processor_->wait_server_read(&buffer);
- }
- }
- return arrow::Status::OK();
- }
+ arrow::Status Write(const void* data, int64_t nBytes) override;
using arrow::io::OutputStream::Write;
private:
- Processor* processor_;
+ Executor* executor_;
int64_t position_;
bool is_open_;
};
class WorkerProcessor : public Processor {
public:
- explicit WorkerProcessor(const char* tag) : Processor(tag)
+ explicit WorkerProcessor(const char* tag) : Processor(tag),
sessions_(nullptr)
{
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
bool found;
- auto sharedData = static_cast<SharedData*>(
- ShmemInitStruct(SharedDataName, sizeof(SharedData),
&found));
+ sharedData_ = static_cast<SharedData*>(
+ ShmemInitStruct(SharedDataName, sizeof(sharedData_),
&found));
if (!found)
{
LWLockRelease(AddinShmemInitLock);
@@ -554,83 +468,156 @@ class WorkerProcessor : public Processor {
errcode(ERRCODE_INTERNAL_ERROR),
errmsg("%s: %s: shared data isn't created yet",
Tag, tag_));
}
- auto area = dsa_attach(sharedData->handle);
+ area_ = dsa_attach(sharedData_->handle);
+ SessionsParams.tranche_id = sharedData_->sessionsTrancheID;
+ sessions_ =
+ dshash_attach(area_, &SessionsParams,
sharedData_->sessionsHandle, nullptr);
lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
LWLockRelease(AddinShmemInitLock);
- sharedData_ = sharedData;
- area_ = area;
}
+
+ ~WorkerProcessor() override { dshash_detach(sessions_); }
+
+ protected:
+ void delete_session(SessionData* session)
+ {
+ session->status.~Status();
+ if (DsaPointerIsValid(session->databaseName))
+ dsa_free(area_, session->databaseName);
+ if (DsaPointerIsValid(session->userName))
+ dsa_free(area_, session->userName);
+ if (DsaPointerIsValid(session->password))
+ dsa_free(area_, session->password);
+ if (DsaPointerIsValid(session->query))
+ dsa_free(area_, session->query);
+ SharedRingBuffer::free_data(&(session->bufferData), area_);
+ dshash_delete_entry(sessions_, session);
+ }
+
+ protected:
+ dshash_table* sessions_;
};
class Executor : public WorkerProcessor {
public:
- explicit Executor() : WorkerProcessor("executor") {}
+ explicit Executor(uint64_t sessionID)
+ : WorkerProcessor("executor"),
+ sessionID_(sessionID),
+ session_(nullptr),
+ connected_(false)
+ {
+ }
void open()
{
pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
opening").c_str());
- LWLockAcquire(lock_, LW_EXCLUSIVE);
- BackgroundWorkerInitializeConnection(
- static_cast<const char*>(
- dsa_get_address(area_,
sharedData_->connectData.databaseName)),
- static_cast<const char*>(
- dsa_get_address(area_,
sharedData_->connectData.userName)),
- 0);
- unsetSharedString(sharedData_->connectData.databaseName);
- unsetSharedString(sharedData_->connectData.userName);
- unsetSharedString(sharedData_->connectData.password);
+ session_ = static_cast<SessionData*>(dshash_find(sessions_,
&sessionID_, false));
+ auto databaseName =
+ static_cast<const char*>(dsa_get_address(area_,
session_->databaseName));
+ auto userName =
+ static_cast<const char*>(dsa_get_address(area_,
session_->userName));
+ // TODO: Check password. See src/backend/libpq/auth.c
+ BackgroundWorkerInitializeConnection(databaseName, userName, 0);
{
- SharedRingBuffer
buffer(&(sharedData_->executeData.bufferData), area_);
+ SharedRingBuffer buffer(&(session_->bufferData), area_);
// TODO: Customizable.
buffer.allocate(1L * 1024L * 1024L);
}
- LWLockRelease(lock_);
StartTransactionCommand();
SPI_connect();
pgstat_report_activity(STATE_IDLE, NULL);
+ session_->initialized = true;
+ connected_ = true;
+ P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC,
sharedData_->serverPID);
+ kill(sharedData_->serverPID, SIGUSR1);
}
void close()
{
pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
closing").c_str());
- SPI_finish();
- CommitTransactionCommand();
- LWLockAcquire(lock_, LW_EXCLUSIVE);
+ if (connected_)
{
- SharedRingBuffer
buffer(&(sharedData_->executeData.bufferData), area_);
- buffer.free();
+ SPI_finish();
+ CommitTransactionCommand();
+ {
+ SharedRingBuffer
buffer(&(session_->bufferData), area_);
+ buffer.free();
+ }
+ delete_session(session_);
+ }
+ else
+ {
+ // TODO: Improve failed to connect case.
+ session_->status = arrow::Status::Invalid("failed to
connect");
+ session_->initialized = true;
+ P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC,
sharedData_->serverPID);
+ kill(sharedData_->serverPID, SIGUSR1);
}
- sharedData_->executorPID = InvalidPid;
- LWLockRelease(lock_);
pgstat_report_activity(STATE_IDLE, NULL);
}
- void signaled()
+ SharedRingBuffer create_shared_ring_buffer()
+ {
+ return SharedRingBuffer(&(session_->bufferData), area_);
+ }
+
+ void wait_server_read(SharedRingBuffer* buffer)
{
- P("%s: %s: signaled: before: %d", Tag, tag_,
sharedData_->executeData.query);
- P("signaled: before: %d", sharedData_->executeData.query);
- if (DsaPointerIsValid(sharedData_->executeData.query))
+ if (ARROW_PREDICT_FALSE(sharedData_->serverPID == InvalidPid))
{
- execute();
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: server isn't alive", Tag,
tag_));
}
- else
+
+ P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC,
sharedData_->serverPID);
+ kill(sharedData_->serverPID, SIGUSR1);
+ auto restSize = buffer->rest_size();
+ while (true)
{
- Processor::signaled();
+ int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+ WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
+ if (GotSIGTERM)
+ {
+ break;
+ }
+ ResetLatch(MyLatch);
+
+ if (GotSIGUSR1)
+ {
+ GotSIGUSR1 = false;
+ P("%s: %s: %s: wait: read: %d:%d",
+ Tag,
+ tag_,
+ AFS_FUNC,
+ buffer->rest_size(),
+ restSize);
+ if (buffer->rest_size() != restSize)
+ {
+ break;
+ }
+ }
+
+ CHECK_FOR_INTERRUPTS();
}
- P("%s: %s: signaled: after: %d", Tag, tag_,
sharedData_->executeData.query);
}
- private:
- void unsetSharedString(dsa_pointer& pointer)
+ void signaled()
{
- if (!DsaPointerIsValid(pointer))
+ P("%s: %s: signaled: before: %d", Tag, tag_, session_->query);
+ P("signaled: before: %d", session_->query);
+ if (DsaPointerIsValid(session_->query))
{
- return;
+ execute();
+ }
+ else
+ {
+ Processor::signaled();
}
- dsa_free(area_, pointer);
- pointer = InvalidDsaPointer;
+ P("%s: %s: signaled: after: %d", Tag, tag_, session_->query);
}
+ private:
void execute()
{
pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
executing").c_str());
@@ -638,13 +625,13 @@ class Executor : public WorkerProcessor {
PushActiveSnapshot(GetTransactionSnapshot());
LWLockAcquire(lock_, LW_EXCLUSIVE);
- auto query = static_cast<const char*>(
- dsa_get_address(area_, sharedData_->executeData.query));
+ std::string query(
+ static_cast<const char*>(dsa_get_address(area_,
session_->query)));
+ dsa_free(area_, session_->query);
+ session_->query = InvalidDsaPointer;
SetCurrentStatementStartTimestamp();
- P("%s: %s: execute: %s", Tag, tag_, query);
- auto result = SPI_execute(query, true, 0);
- dsa_free(area_, sharedData_->executeData.query);
- sharedData_->executeData.query = InvalidDsaPointer;
+ P("%s: %s: execute: %s", Tag, tag_, query.c_str());
+ auto result = SPI_execute(query.c_str(), true, 0);
LWLockRelease(lock_);
if (result == SPI_OK_SELECT)
@@ -654,17 +641,25 @@ class Executor : public WorkerProcessor {
auto status = write();
if (!status.ok())
{
- ereport(ERROR,
- errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("%s: %s: failed to write", Tag,
tag_));
+ session_->status = status;
}
}
+ else
+ {
+ session_->status = arrow::Status::Invalid(Tag,
+ ": ",
+ tag_,
+ ": failed to
run a query: <",
+ query,
+ ">: ",
+
SPI_result_code_string(result));
+ }
PopActiveSnapshot();
if (sharedData_->serverPID != InvalidPid)
{
- P("%s: %s: kill server: %s", Tag, tag_,
sharedData_->serverPID);
+ P("%s: %s: kill server: %d", Tag, tag_,
sharedData_->serverPID);
kill(sharedData_->serverPID, SIGUSR1);
}
@@ -773,46 +768,164 @@ class Executor : public WorkerProcessor {
ARROW_RETURN_NOT_OK(writer->Close());
return output.Close();
}
+
+ uint64_t sessionID_;
+ SessionData* session_;
+ bool connected_;
};
+arrow::Status
+SharedRingBufferOutputStream::Write(const void* data, int64_t nBytes)
+{
+ if (ARROW_PREDICT_FALSE(!is_open_))
+ {
+ return arrow::Status::IOError(std::string(Tag) + ": " +
executor_->tag() +
+ ": SharedRingBufferOutputStream
is closed");
+ }
+ if (ARROW_PREDICT_TRUE(nBytes > 0))
+ {
+ auto buffer = std::move(executor_->create_shared_ring_buffer());
+ size_t rest = static_cast<size_t>(nBytes);
+ while (true)
+ {
+ executor_->lock_acquire(LW_EXCLUSIVE);
+ auto writtenSize = buffer.write(data, rest);
+ executor_->lock_release();
+
+ position_ += writtenSize;
+ rest -= writtenSize;
+ data = static_cast<const uint8_t*>(data) + writtenSize;
+
+ if (ARROW_PREDICT_TRUE(rest == 0))
+ {
+ break;
+ }
+
+ executor_->wait_server_read(&buffer);
+ }
+ }
+ return arrow::Status::OK();
+}
+
class Proxy : public WorkerProcessor {
public:
- explicit Proxy() : WorkerProcessor("proxy") {}
+ explicit Proxy()
+ : WorkerProcessor("proxy"), randomSeed_(),
randomEngine_(randomSeed_())
+ {
+ }
- arrow::Status connect(const std::string& databaseName,
- const std::string& userName,
- const std::string& password)
+ SharedRingBuffer create_shared_ring_buffer(SessionData* session)
{
- if (sharedData_->executorPID != InvalidPid)
+ return SharedRingBuffer(&(session->bufferData), area_);
+ }
+
+ void wait_executor_written(SessionData* session, SharedRingBuffer*
buffer)
+ {
+ if (ARROW_PREDICT_FALSE(session->executorPID == InvalidPid))
{
- return arrow::Status::OK();
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: executor isn't alive", Tag,
tag_));
}
- LWLockAcquire(lock_, LW_EXCLUSIVE);
- setSharedString(sharedData_->connectData.databaseName,
databaseName);
- setSharedString(sharedData_->connectData.userName, userName);
- setSharedString(sharedData_->connectData.password, password);
- LWLockRelease(lock_);
+
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, AFS_FUNC,
session->executorPID);
+ kill(session->executorPID, SIGUSR1);
+ auto size = buffer->size();
+ std::unique_lock<std::mutex> lock(mutex_);
+ conditionVariable_.wait(lock, [&] {
+ P("%s: %s: %s: wait: write: %d:%d",
+ Tag,
+ tag_,
+ AFS_FUNC,
+ buffer->size(),
+ size);
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return buffer->size() != size;
+ });
+ }
+
+ arrow::Result<uint64_t> connect(const std::string& databaseName,
+ const std::string& userName,
+ const std::string& password)
+ {
+ auto session = create_session(databaseName, userName, password);
+ auto id = session->id;
+ dshash_release_lock(sessions_, session);
kill(sharedData_->mainPID, SIGUSR1);
{
std::unique_lock<std::mutex> lock(mutex_);
- conditionVariable_.wait(
- lock, [&] { return sharedData_->executorPID !=
InvalidPid; });
+ conditionVariable_.wait(lock, [&] {
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ session =
static_cast<SessionData*>(dshash_find(sessions_, &id, false));
+ if (!session)
+ {
+ return true;
+ }
+ const auto initialized = session->initialized;
+ dshash_release_lock(sessions_, session);
+ return initialized;
+ });
}
- return arrow::Status::OK();
+ session = static_cast<SessionData*>(dshash_find(sessions_, &id,
false));
+ if (!session)
+ {
+ return arrow::Status::Invalid("session is stale: ", id);
+ }
+ if (!session->status.ok())
+ {
+ auto status = session->status;
+ delete_session(session);
+ return status;
+ }
+ dshash_release_lock(sessions_, session);
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::Invalid("interrupted");
+ }
+ return id;
}
- arrow::Result<std::shared_ptr<arrow::Schema>> execute(const
std::string& query)
+ bool is_valid_session(uint64_t sessionID)
{
- LWLockAcquire(lock_, LW_EXCLUSIVE);
- setSharedString(sharedData_->executeData.query, query);
- LWLockRelease(lock_);
- if (sharedData_->executorPID != InvalidPid)
+ auto session = find_session(sessionID);
+ if (session)
+ {
+ dshash_release_lock(sessions_, session);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ arrow::Result<std::shared_ptr<arrow::Schema>> execute(uint64_t
sessionID,
+ const
std::string& query)
+ {
+ auto session = find_session(sessionID);
+ SessionReleaser sessionReleaser(sessions_, session);
+ set_shared_string(session->query, query);
+ if (session->executorPID != InvalidPid)
{
- P("%s: %s: execute: kill executor: %d", Tag, tag_,
sharedData_->executorPID);
- kill(sharedData_->executorPID, SIGUSR1);
+ P("%s: %s: execute: kill executor: %d", Tag, tag_,
session->executorPID);
+ kill(session->executorPID, SIGUSR1);
+ }
+ {
+ auto buffer =
std::move(create_shared_ring_buffer(session));
+ std::unique_lock<std::mutex> lock(mutex_);
+ conditionVariable_.wait(lock, [&] {
+ P("%s: %s: %s: wait: execute", Tag, tag_,
AFS_FUNC);
+ return !session->status.ok() || buffer.size() >
0;
+ });
}
P("%s: %s: execute: open", Tag, tag_);
- auto input =
std::make_shared<SharedRingBufferInputStream>(this);
+ auto input =
std::make_shared<SharedRingBufferInputStream>(this, session);
// Read schema only stream format data.
ARROW_ASSIGN_OR_RAISE(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(input));
@@ -830,16 +943,62 @@ class Proxy : public WorkerProcessor {
return reader->schema();
}
- arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read()
+ arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read(uint64_t
sessionID)
{
- auto input =
std::make_shared<SharedRingBufferInputStream>(this);
+ auto session = find_session(sessionID);
+ SessionReleaser sessionReleaser(sessions_, session);
+ auto input =
std::make_shared<SharedRingBufferInputStream>(this, session);
// Read another stream format data with record batches.
return arrow::ipc::RecordBatchStreamReader::Open(input);
}
private:
- void setSharedString(dsa_pointer& pointer, const std::string& input)
+ SessionData* create_session(const std::string& databaseName,
+ const std::string& userName,
+ const std::string& password)
{
+ LWLockAcquire(lock_, LW_EXCLUSIVE);
+ uint64_t id = 0;
+ SessionData* session = nullptr;
+ do
+ {
+ id = randomEngine_();
+ if (id == 0)
+ {
+ continue;
+ }
+ bool found = false;
+ session =
+
static_cast<SessionData*>(dshash_find_or_insert(sessions_, &id, &found));
+ if (!found)
+ {
+ break;
+ }
+ } while (true);
+ new (&(session->status)) arrow::Status;
+ session->executorPID = InvalidPid;
+ session->initialized = false;
+ set_shared_string(session->databaseName, databaseName);
+ set_shared_string(session->userName, userName);
+ set_shared_string(session->password, password);
+ session->query = InvalidDsaPointer;
+ SharedRingBuffer::initialize_data(&(session->bufferData));
+ LWLockRelease(lock_);
+ return session;
+ }
+
+ SessionData* find_session(uint64_t sessionID)
+ {
+ return static_cast<SessionData*>(dshash_find(sessions_,
&sessionID, false));
+ }
+
+ void set_shared_string(dsa_pointer& pointer, const std::string& input)
+ {
+ if (DsaPointerIsValid(pointer))
+ {
+ dsa_free(area_, pointer);
+ pointer = InvalidDsaPointer;
+ }
if (input.empty())
{
return;
@@ -847,16 +1006,53 @@ class Proxy : public WorkerProcessor {
pointer = dsa_allocate(area_, input.size() + 1);
memcpy(dsa_get_address(area_, pointer), input.c_str(),
input.size() + 1);
}
+
+ std::random_device randomSeed_;
+ std::mt19937_64 randomEngine_;
};
+arrow::Result<int64_t>
+SharedRingBufferInputStream::Read(int64_t nBytes, void* out)
+{
+ if (ARROW_PREDICT_FALSE(!is_open_))
+ {
+ return arrow::Status::IOError(std::string(Tag) + ": " +
proxy_->tag() +
+ ": SharedRingBufferInputStream is
closed");
+ }
+ auto buffer = std::move(proxy_->create_shared_ring_buffer(session_));
+ size_t rest = static_cast<size_t>(nBytes);
+ while (true)
+ {
+ proxy_->lock_acquire(LW_EXCLUSIVE);
+ auto readBytes = buffer.read(rest, out);
+ proxy_->lock_release();
+
+ position_ += readBytes;
+ rest -= readBytes;
+ out = static_cast<uint8_t*>(out) + readBytes;
+ if (ARROW_PREDICT_TRUE(rest == 0))
+ {
+ break;
+ }
+
+ proxy_->wait_executor_written(session_, &buffer);
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::IOError(std::string(Tag) + ": " +
proxy_->tag() +
+ ": interrupted");
+ }
+ }
+ return nBytes;
+}
+
class MainProcessor : public Processor {
public:
- MainProcessor() : Processor("main")
+ MainProcessor() : Processor("main"), sessions_(nullptr)
{
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
bool found;
- auto sharedData = static_cast<SharedData*>(
- ShmemInitStruct(SharedDataName, sizeof(SharedData),
&found));
+ sharedData_ = static_cast<SharedData*>(
+ ShmemInitStruct(SharedDataName, sizeof(sharedData_),
&found));
if (found)
{
LWLockRelease(AddinShmemInitLock);
@@ -864,21 +1060,21 @@ class MainProcessor : public Processor {
errcode(ERRCODE_INTERNAL_ERROR),
errmsg("%s: %s: shared data is already
created", Tag, tag_));
}
- auto area = dsa_create(LWLockNewTrancheId());
- sharedData->handle = dsa_get_handle(area);
- sharedData->executorPID = InvalidPid;
- sharedData->serverPID = InvalidPid;
- sharedData->mainPID = MyProcPid;
- sharedData->connectData.databaseName = InvalidDsaPointer;
- sharedData->connectData.userName = InvalidDsaPointer;
- sharedData->connectData.password = InvalidDsaPointer;
-
SharedRingBuffer::initialize_data(&(sharedData->executeData.bufferData));
+ sharedData_->trancheID = LWLockNewTrancheId();
+ sharedData_->sessionsTrancheID = LWLockNewTrancheId();
+ area_ = dsa_create(sharedData_->trancheID);
+ sharedData_->handle = dsa_get_handle(area_);
+ SessionsParams.tranche_id = sharedData_->sessionsTrancheID;
+ sessions_ = dshash_create(area_, &SessionsParams, nullptr);
+ sharedData_->sessionsHandle =
dshash_get_hash_table_handle(sessions_);
+ sharedData_->serverPID = InvalidPid;
+ sharedData_->mainPID = MyProcPid;
lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
LWLockRelease(AddinShmemInitLock);
- sharedData_ = sharedData;
- area_ = area;
}
+ ~MainProcessor() override { dshash_destroy(sessions_); }
+
BackgroundWorkerHandle* start_server()
{
BackgroundWorker worker = {0};
@@ -902,34 +1098,68 @@ class MainProcessor : public Processor {
return handle;
}
- void process_connect_request()
+ void process_connect_requests()
{
- if (!DsaPointerIsValid(sharedData_->connectData.databaseName))
+ dshash_seq_status sessionsStatus;
+ dshash_seq_init(&sessionsStatus, sessions_, false);
+ SessionData* session;
+ while ((session =
static_cast<SessionData*>(dshash_seq_next(&sessionsStatus))))
{
- return;
- }
+ if (session->initialized)
+ {
+ continue;
+ }
- BackgroundWorker worker = {0};
- // TODO: Add executor ID to bgw_name
- snprintf(worker.bgw_name, BGW_MAXLEN, "%s: executor", Tag);
- snprintf(worker.bgw_type, BGW_MAXLEN, "%s: executor", Tag);
- worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
- worker.bgw_start_time = BgWorkerStart_ConsistentState;
- worker.bgw_restart_time = BGW_NEVER_RESTART;
- snprintf(worker.bgw_library_name, BGW_MAXLEN, "%s",
LibraryName);
- snprintf(worker.bgw_function_name, BGW_MAXLEN, "afs_executor");
- worker.bgw_main_arg = 0;
- worker.bgw_notify_pid = MyProcPid;
- BackgroundWorkerHandle* handle;
- if (!RegisterDynamicBackgroundWorker(&worker, &handle))
- {
- ereport(ERROR,
- errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("%s: %s: failed to start executor", Tag,
tag_));
+ BackgroundWorker worker = {0};
+ snprintf(
+ worker.bgw_name, BGW_MAXLEN, "%s: executor: %"
PRIu64, Tag, session->id);
+ snprintf(
+ worker.bgw_type, BGW_MAXLEN, "%s: executor: %"
PRIu64, Tag, session->id);
+ worker.bgw_flags =
+ BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ snprintf(worker.bgw_library_name, BGW_MAXLEN, "%s",
LibraryName);
+ snprintf(worker.bgw_function_name, BGW_MAXLEN,
"afs_executor");
+ worker.bgw_main_arg = Int64GetDatum(session->id);
+ worker.bgw_notify_pid = MyProcPid;
+ BackgroundWorkerHandle* handle;
+ if (RegisterDynamicBackgroundWorker(&worker, &handle))
+ {
+ WaitForBackgroundWorkerStartup(handle,
&(session->executorPID));
+ }
+ else
+ {
+ session->status = arrow::Status::UnknownError(
+ Tag, ": ", tag_, ": failed to start
executor: ", session->id);
+ }
}
- WaitForBackgroundWorkerStartup(handle,
&(sharedData_->executorPID));
+ dshash_seq_term(&sessionsStatus);
kill(sharedData_->serverPID, SIGUSR1);
}
+
+ private:
+ dshash_table* sessions_;
+};
+
+class HeaderAuthServerMiddleware : public arrow::flight::ServerMiddleware {
+ public:
+ explicit HeaderAuthServerMiddleware(uint64_t sessionID) :
sessionID_(sessionID) {}
+
+ void SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers)
override
+ {
+ outgoing_headers->AddHeader("authorization",
+ std::string("Bearer ") +
std::to_string(sessionID_));
+ }
+
+ void CallCompleted(const arrow::Status& status) override {}
+
+ std::string name() const override { return
"HeaderAuthServerMiddleware"; }
+
+ uint64_t session_id() { return sessionID_; }
+
+ private:
+ uint64_t sessionID_;
};
class HeaderAuthServerMiddlewareFactory : public
arrow::flight::ServerMiddlewareFactory {
@@ -939,9 +1169,10 @@ class HeaderAuthServerMiddlewareFactory : public
arrow::flight::ServerMiddleware
{
}
- arrow::Status StartCall(const arrow::flight::CallInfo& info,
- const arrow::flight::CallHeaders&
incoming_headers,
-
std::shared_ptr<arrow::flight::ServerMiddleware>* middleware)
+ arrow::Status StartCall(
+ const arrow::flight::CallInfo& info,
+ const arrow::flight::CallHeaders& incoming_headers,
+ std::shared_ptr<arrow::flight::ServerMiddleware>* middleware)
override
{
std::string databaseName("postgres");
auto databaseHeader =
incoming_headers.find("x-flight-sql-database");
@@ -949,25 +1180,69 @@ class HeaderAuthServerMiddlewareFactory : public
arrow::flight::ServerMiddleware
{
databaseName = databaseHeader->second;
}
- std::string userName("");
- std::string password("");
auto authorizationHeader =
incoming_headers.find("authorization");
- if (authorizationHeader != incoming_headers.end())
+ if (authorizationHeader == incoming_headers.end())
+ {
+ return arrow::flight::MakeFlightError(
+
arrow::flight::FlightStatusCode::Unauthenticated,
+ "No authorization header");
+ }
+ auto value = authorizationHeader->second;
+ std::stringstream valueStream{std::string(value)};
+ std::string type("");
+ std::getline(valueStream, type, ' ');
+ if (type == "Basic")
{
std::stringstream decodedStream(
-
arrow::util::base64_decode(authorizationHeader->second));
+
arrow::util::base64_decode(value.substr(valueStream.tellg())));
+ std::string userName("");
+ std::string password("");
std::getline(decodedStream, userName, ':');
std::getline(decodedStream, password);
+ auto sessionIDResult = proxy_->connect(databaseName,
userName, password);
+ if (!sessionIDResult.status().ok())
+ {
+ return arrow::flight::MakeFlightError(
+
arrow::flight::FlightStatusCode::Unauthenticated,
+ sessionIDResult.status().ToString());
+ }
+ auto sessionID = *sessionIDResult;
+ *middleware =
std::make_shared<HeaderAuthServerMiddleware>(sessionID);
+ return arrow::Status::OK();
}
- auto status = proxy_->connect(databaseName, userName, password);
- if (status.ok())
+ else if (type == "Bearer")
{
- return status;
+ std::string
sessionIDString(value.substr(valueStream.tellg()));
+ if (sessionIDString.size() == 0)
+ {
+ return arrow::flight::MakeFlightError(
+
arrow::flight::FlightStatusCode::Unauthorized,
+ std::string("invalid Bearer token"));
+ }
+ auto start = sessionIDString.c_str();
+ char* end = nullptr;
+ uint64_t sessionID = std::strtoull(start, &end, 10);
+ if (end[0] != '\0')
+ {
+ return arrow::flight::MakeFlightError(
+
arrow::flight::FlightStatusCode::Unauthorized,
+ std::string("invalid Bearer token"));
+ }
+ if (!proxy_->is_valid_session(sessionID))
+ {
+ return arrow::flight::MakeFlightError(
+
arrow::flight::FlightStatusCode::Unauthorized,
+ std::string("invalid Bearer token"));
+ }
+ *middleware =
std::make_shared<HeaderAuthServerMiddleware>(sessionID);
+ return arrow::Status::OK();
}
else
{
return arrow::flight::MakeFlightError(
-
arrow::flight::FlightStatusCode::Unauthenticated, status.ToString());
+
arrow::flight::FlightStatusCode::Unauthenticated,
+ std::string("authorization header must use
Basic or Bearer: <") + type +
+ std::string(">"));
}
}
@@ -989,8 +1264,9 @@ class FlightSQLServer : public
arrow::flight::sql::FlightSqlServerBase {
const arrow::flight::sql::StatementQuery& command,
const arrow::flight::FlightDescriptor& descriptor)
{
+ ARROW_ASSIGN_OR_RAISE(auto sessionID, session_id(context));
const auto& query = command.query;
- ARROW_ASSIGN_OR_RAISE(auto schema, proxy_->execute(query));
+ ARROW_ASSIGN_OR_RAISE(auto schema, proxy_->execute(sessionID,
query));
ARROW_ASSIGN_OR_RAISE(auto ticket,
arrow::flight::sql::CreateStatementQueryTicket(query));
std::vector<arrow::flight::FlightEndpoint> endpoints{
@@ -1005,11 +1281,24 @@ class FlightSQLServer : public
arrow::flight::sql::FlightSqlServerBase {
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::StatementQueryTicket& command)
{
- ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->read());
+ ARROW_ASSIGN_OR_RAISE(auto sessionID, session_id(context));
+ ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->read(sessionID));
return
std::make_unique<arrow::flight::RecordBatchStream>(reader);
}
private:
+ arrow::Result<uint64_t> session_id(const
arrow::flight::ServerCallContext& context)
+ {
+ auto middleware = reinterpret_cast<HeaderAuthServerMiddleware*>(
+ context.GetMiddleware("header-auth"));
+ if (!middleware)
+ {
+ return arrow::flight::MakeFlightError(
+
arrow::flight::FlightStatusCode::Unauthenticated, "no authorization");
+ }
+ return middleware->session_id();
+ }
+
Proxy* proxy_;
};
@@ -1018,6 +1307,7 @@ afs_server_internal(Proxy* proxy)
{
ARROW_ASSIGN_OR_RAISE(auto location,
arrow::flight::Location::Parse(URI));
arrow::flight::FlightServerOptions options(location);
+ options.auth_handler =
std::make_unique<arrow::flight::NoOpAuthHandler>();
options.middleware.push_back(
{"header-auth",
std::make_shared<HeaderAuthServerMiddlewareFactory>(proxy)});
FlightSQLServer flightSQLServer(proxy);
@@ -1050,6 +1340,16 @@ afs_server_internal(Proxy* proxy)
} // namespace
+static void
+afs_executor_before_shmem_exit(int code, Datum arg)
+{
+ // TODO: This doesn't work. We need to improve
+ // BackgroundWorkerInitializeConnection() failed case.
+ auto executor = reinterpret_cast<Executor*>(DatumGetPointer(arg));
+ executor->close();
+ delete executor;
+}
+
extern "C" void
afs_executor(Datum arg)
{
@@ -1058,42 +1358,40 @@ afs_executor(Datum arg)
pqsignal(SIGUSR1, afs_sigusr1);
BackgroundWorkerUnblockSignals();
+ auto executor = new Executor(DatumGetInt64(arg));
+ before_shmem_exit(afs_executor_before_shmem_exit,
PointerGetDatum(executor));
+ executor->open();
+
+ while (!GotSIGTERM)
{
- Executor executor;
- executor.open();
- while (!GotSIGTERM)
+ int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+ const long timeout = SessionTimeout * 1000;
+ if (timeout >= 0)
{
- int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
- const long timeout = SessionTimeout * 1000;
- if (timeout >= 0)
- {
- events |= WL_TIMEOUT;
- }
- auto conditions = WaitLatch(MyLatch, events, timeout,
PG_WAIT_EXTENSION);
-
- if (conditions & WL_TIMEOUT)
- {
- break;
- }
+ events |= WL_TIMEOUT;
+ }
+ auto conditions = WaitLatch(MyLatch, events, timeout,
PG_WAIT_EXTENSION);
- ResetLatch(MyLatch);
+ if (conditions & WL_TIMEOUT)
+ {
+ break;
+ }
- if (GotSIGHUP)
- {
- GotSIGHUP = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ ResetLatch(MyLatch);
- if (GotSIGUSR1)
- {
- GotSIGUSR1 = false;
- executor.signaled();
- }
+ if (GotSIGHUP)
+ {
+ GotSIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
- CHECK_FOR_INTERRUPTS();
+ if (GotSIGUSR1)
+ {
+ GotSIGUSR1 = false;
+ executor->signaled();
}
- // TODO: Use before_shmem_exit()
- executor.close();
+
+ CHECK_FOR_INTERRUPTS();
}
proc_exit(0);
@@ -1149,7 +1447,7 @@ afs_main(Datum arg)
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
- processor.process_connect_request();
+ processor.process_connect_requests();
}
CHECK_FOR_INTERRUPTS();
diff --git a/test/test-flight-sql.rb b/test/test-flight-sql.rb
index 2b26e09..71a2174 100644
--- a/test/test-flight-sql.rb
+++ b/test/test-flight-sql.rb
@@ -18,14 +18,23 @@
class FlightSQLTest < Test::Unit::TestCase
include Helper::Sandbox
+ setup do
+ unless flight_client.respond_to?(:authenticate_basic)
+ omit("ArrowFlight::Client#authenticate_basic is needed" )
+ end
+ @options = ArrowFlight::CallOptions.new
+ @options.add_header("x-flight-sql-database", @test_db_name)
+ user = @postgresql.user
+ password = ""
+ flight_client.authenticate_basic(user, password, @options)
+ end
+
def test_select_int32
- options = ArrowFlight::CallOptions.new
- options.add_header("x-flight-sql-database", @test_db_name)
- info = flight_sql_client.execute("SELECT 1 AS value", options)
+ info = flight_sql_client.execute("SELECT 1 AS value", @options)
assert_equal(Arrow::Schema.new(value: :int32),
info.get_schema)
endpoint = info.endpoints.first
- reader = flight_sql_client.do_get(endpoint.ticket, options)
+ reader = flight_sql_client.do_get(endpoint.ticket, @options)
assert_equal(Arrow::Table.new(value: Arrow::Int32Array.new([1])),
reader.read_all)
end
@@ -34,13 +43,11 @@ class FlightSQLTest < Test::Unit::TestCase
run_sql("CREATE TABLE data (value integer)")
run_sql("INSERT INTO data VALUES (1), (-2), (3)")
- options = ArrowFlight::CallOptions.new
- options.add_header("x-flight-sql-database", @test_db_name)
- info = flight_sql_client.execute("SELECT * FROM data", options)
+ info = flight_sql_client.execute("SELECT * FROM data", @options)
assert_equal(Arrow::Schema.new(value: :int32),
info.get_schema)
endpoint = info.endpoints.first
- reader = flight_sql_client.do_get(endpoint.ticket, options)
+ reader = flight_sql_client.do_get(endpoint.ticket, @options)
assert_equal(Arrow::Table.new(value: Arrow::Int32Array.new([1, -2, 3])),
reader.read_all)
end