This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git
The following commit(s) were added to refs/heads/main by this push:
new 7bbb79f Add support for large response (#24)
7bbb79f is described below
commit 7bbb79f6a259a3e4d9f2953b9f76728f937890f4
Author: Sutou Kouhei <[email protected]>
AuthorDate: Wed Mar 1 08:38:02 2023 +0900
Add support for large response (#24)
Closes GH-21
---
CMakeLists.txt | 2 +
src/afs.cc | 617 +++++++++++++++++++++++++++++++++++++++++++++++++--------
2 files changed, 538 insertions(+), 81 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5e370af..0799936 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -85,6 +85,8 @@ find_package(ArrowFlightSql REQUIRED)
add_library(arrow_flight_sql MODULE ${AFS_SOURCES})
set_target_properties(arrow_flight_sql PROPERTIES PREFIX "")
+target_compile_definitions(arrow_flight_sql
+ PRIVATE "$<$<CONFIG:Debug>:AFS_DEBUG>")
target_link_libraries(arrow_flight_sql postgresql
ArrowFlightSql::arrow_flight_sql_shared)
execute_process(
diff --git a/src/afs.cc b/src/afs.cc
index f070974..79597a2 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -51,6 +51,18 @@ extern "C"
#include <condition_variable>
#include <sstream>
+#ifdef __GNUC__
+#define AFS_FUNC __PRETTY_FUNCTION__
+#else
+#define AFS_FUNC __func__
+#endif
+
+#ifdef AFS_DEBUG
+#define P(...) ereport(DEBUG5, errmsg_internal(__VA_ARGS__))
+#else
+#define P(...)
+#endif
+
extern "C"
{
PG_MODULE_MAGIC;
@@ -108,20 +120,183 @@ struct ConnectData {
dsa_pointer password;
};
-struct Buffer {
- dsa_pointer data;
+struct SharedRingBufferData {
+ dsa_pointer pointer;
size_t total;
- size_t used;
+ size_t head;
+ size_t tail;
+};
+
+class SharedRingBuffer {
+ public:
+ static void initialize_data(SharedRingBufferData* data)
+ {
+ data->pointer = InvalidDsaPointer;
+ data->total = 0;
+ data->head = 0;
+ data->tail = 0;
+ }
+
+ SharedRingBuffer(SharedRingBufferData* data, dsa_area* area)
+ : data_(data), area_(area)
+ {
+ }
+
+ void allocate(size_t total)
+ {
+ data_->pointer = dsa_allocate(area_, total);
+ data_->total = total;
+ data_->head = 0;
+ data_->tail = 0;
+ }
+
+ void free()
+ {
+ dsa_free(area_, data_->pointer);
+ initialize_data(data_);
+ }
+
+ size_t size() const
+ {
+ if (data_->head <= data_->tail)
+ {
+ return data_->tail - data_->head;
+ }
+ else
+ {
+ return (data_->total - data_->head) + data_->tail;
+ }
+ }
+
+ size_t rest_size() const { return data_->total - size() - 1; }
+
+ size_t write(const void* data, size_t n)
+ {
+ P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head,
data_->tail, n);
+ if (rest_size() == 0)
+ {
+ P("%s: %s: after: no space: (%d:%d) %d:0",
+ Tag,
+ AFS_FUNC,
+ data_->head,
+ data_->tail,
+ n);
+ return 0;
+ }
+
+ size_t writtenSize = 0;
+ auto output = address();
+ if (data_->head <= data_->tail)
+ {
+ auto restSize = data_->total - data_->tail;
+ if (data_->head == 0)
+ {
+ restSize--;
+ }
+ const auto firstHalfWriteSize = std::min(n, restSize);
+ P("%s: %s: first half: (%d:%d) %d:%d",
+ Tag,
+ AFS_FUNC,
+ data_->head,
+ data_->tail,
+ n,
+ firstHalfWriteSize);
+ memcpy(output + data_->tail, data, firstHalfWriteSize);
+ data_->tail = (data_->tail + firstHalfWriteSize) %
data_->total;
+ n -= firstHalfWriteSize;
+ writtenSize += firstHalfWriteSize;
+ }
+ if (n > 0 && rest_size() > 0)
+ {
+ const auto lastHalfWriteSize = std::min(n, data_->head
- data_->tail - 1);
+ P("%s: %s: last half: (%d:%d) %d:%d",
+ Tag,
+ AFS_FUNC,
+ data_->head,
+ data_->tail,
+ n,
+ lastHalfWriteSize);
+ memcpy(output + data_->tail,
+ static_cast<const uint8_t*>(data) + writtenSize,
+ lastHalfWriteSize);
+ data_->tail += lastHalfWriteSize;
+ n -= lastHalfWriteSize;
+ writtenSize += lastHalfWriteSize;
+ }
+ P("%s: %s: after: (%d:%d) %d:%d",
+ Tag,
+ AFS_FUNC,
+ data_->head,
+ data_->tail,
+ n,
+ writtenSize);
+ return writtenSize;
+ }
+
+ size_t read(size_t n, void* output)
+ {
+ P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head,
data_->tail, n);
+ size_t readSize = 0;
+ const auto input = address();
+ if (data_->head > data_->tail)
+ {
+ const auto firstHalfReadSize = std::min(n, data_->total
- data_->head);
+ P("%s: %s: first half: (%d:%d) %d:%d",
+ Tag,
+ AFS_FUNC,
+ data_->head,
+ data_->tail,
+ n,
+ firstHalfReadSize);
+ memcpy(output, input + data_->head, firstHalfReadSize);
+ data_->head = (data_->head + firstHalfReadSize) %
data_->total;
+ n -= firstHalfReadSize;
+ readSize += firstHalfReadSize;
+ }
+ if (n > 0 && data_->head != data_->tail)
+ {
+ const auto lastHalfReadSize = std::min(n, data_->tail -
data_->head);
+ P("%s: %s: last half: (%d:%d) %d:%d",
+ Tag,
+ AFS_FUNC,
+ data_->head,
+ data_->tail,
+ n,
+ lastHalfReadSize);
+ memcpy(static_cast<uint8_t*>(output) + readSize,
+ input + data_->head,
+ lastHalfReadSize);
+ data_->head += lastHalfReadSize;
+ n -= lastHalfReadSize;
+ readSize += lastHalfReadSize;
+ }
+ P("%s: %s: after: (%d:%d) %d:%d",
+ Tag,
+ AFS_FUNC,
+ data_->head,
+ data_->tail,
+ n,
+ readSize);
+ return readSize;
+ }
+
+ private:
+ SharedRingBufferData* data_;
+ dsa_area* area_;
+
+ uint8_t* address()
+ {
+ return static_cast<uint8_t*>(dsa_get_address(area_,
data_->pointer));
+ }
};
struct ExecuteData {
dsa_pointer query;
- Buffer buffer;
+ SharedRingBufferData bufferData;
};
struct SharedData {
dsa_handle handle;
- LWLock* lock;
pid_t executorPID;
pid_t serverPID;
pid_t mainPID;
@@ -131,15 +306,225 @@ struct SharedData {
class Processor {
public:
- Processor(const char* tag) : tag_(tag), sharedData_(nullptr),
area_(nullptr) {}
+ Processor(const char* tag)
+ : tag_(tag),
+ sharedData_(nullptr),
+ area_(nullptr),
+ lock_(),
+ mutex_(),
+ conditionVariable_()
+ {
+ }
virtual ~Processor() { dsa_detach(area_); }
+ const char* tag() { return tag_; }
+
+ SharedRingBuffer create_shared_ring_buffer()
+ {
+ return SharedRingBuffer(&(sharedData_->executeData.bufferData),
area_);
+ }
+
+ void lock_acquire(LWLockMode mode) { LWLockAcquire(lock_,
LW_EXCLUSIVE); }
+
+ void lock_release() { LWLockRelease(lock_); }
+
+ void wait_executor_written(SharedRingBuffer* buffer)
+ {
+ if (ARROW_PREDICT_FALSE(sharedData_->executorPID == InvalidPid))
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: executor isn't alive", Tag,
tag_));
+ }
+
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, AFS_FUNC,
sharedData_->executorPID);
+ kill(sharedData_->executorPID, SIGUSR1);
+ auto size = buffer->size();
+ std::unique_lock<std::mutex> lock(mutex_);
+ conditionVariable_.wait(lock, [&] {
+ P("%s: %s: %s: wait: write: %d:%d",
+ Tag,
+ tag_,
+ AFS_FUNC,
+ buffer->size(),
+ size);
+ return buffer->size() != size;
+ });
+ }
+
+ void wait_server_read(SharedRingBuffer* buffer)
+ {
+ if (ARROW_PREDICT_FALSE(sharedData_->serverPID == InvalidPid))
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: server isn't alive", Tag,
tag_));
+ }
+
+ P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC,
sharedData_->serverPID);
+ kill(sharedData_->serverPID, SIGUSR1);
+ auto restSize = buffer->rest_size();
+ while (true)
+ {
+ int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+ WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
+ if (GotSIGTERM)
+ {
+ break;
+ }
+ ResetLatch(MyLatch);
+
+ if (GotSIGUSR1)
+ {
+ GotSIGUSR1 = false;
+ P("%s: %s: %s: wait: read: %d:%d",
+ Tag,
+ tag_,
+ AFS_FUNC,
+ buffer->rest_size(),
+ restSize);
+ if (buffer->rest_size() != restSize)
+ {
+ break;
+ }
+ }
+
+ CHECK_FOR_INTERRUPTS();
+ }
+ }
+
+ void signaled()
+ {
+ P("%s: %s: signaled: before", Tag, tag_);
+ conditionVariable_.notify_all();
+ P("%s: %s: signaled: after", Tag, tag_);
+ }
+
protected:
const char* tag_;
SharedData* sharedData_;
dsa_area* area_;
LWLock* lock_;
+ std::mutex mutex_;
+ std::condition_variable conditionVariable_;
+};
+
+class SharedRingBufferInputStream : public arrow::io::InputStream {
+ public:
+ SharedRingBufferInputStream(Processor* processor)
+ : arrow::io::InputStream(), processor_(processor),
position_(0), is_open_(true)
+ {
+ }
+
+ arrow::Status Close() override
+ {
+ is_open_ = false;
+ return arrow::Status::OK();
+ }
+
+ bool closed() const override { return !is_open_; }
+
+ arrow::Result<int64_t> Tell() const override { return position_; }
+
+ arrow::Result<int64_t> Read(int64_t nBytes, void* out) override
+ {
+ if (ARROW_PREDICT_FALSE(!is_open_))
+ {
+ return arrow::Status::IOError(std::string(Tag) + ": " +
processor_->tag() +
+ ":
SharedRingBufferInputStream is closed");
+ }
+ auto buffer =
std::move(processor_->create_shared_ring_buffer());
+ size_t rest = static_cast<size_t>(nBytes);
+ while (true)
+ {
+ processor_->lock_acquire(LW_EXCLUSIVE);
+ auto readBytes = buffer.read(rest, out);
+ processor_->lock_release();
+
+ position_ += readBytes;
+ rest -= readBytes;
+ out = static_cast<uint8_t*>(out) + readBytes;
+ if (ARROW_PREDICT_TRUE(rest == 0))
+ {
+ break;
+ }
+
+ processor_->wait_executor_written(&buffer);
+ }
+ return nBytes;
+ }
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nBytes)
override
+ {
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
arrow::AllocateResizableBuffer(nBytes));
+ ARROW_ASSIGN_OR_RAISE(auto readBytes, Read(nBytes,
buffer->mutable_data()));
+ ARROW_RETURN_NOT_OK(buffer->Resize(readBytes, false));
+ buffer->ZeroPadding();
+ return std::move(buffer);
+ }
+
+ private:
+ Processor* processor_;
+ int64_t position_;
+ bool is_open_;
+};
+
+class SharedRingBufferOutputStream : public arrow::io::OutputStream {
+ public:
+ SharedRingBufferOutputStream(Processor* processor)
+ : arrow::io::OutputStream(), processor_(processor),
position_(0), is_open_(true)
+ {
+ }
+
+ arrow::Status Close() override
+ {
+ is_open_ = false;
+ return arrow::Status::OK();
+ }
+
+ bool closed() const override { return !is_open_; }
+
+ arrow::Result<int64_t> Tell() const override { return position_; }
+
+ arrow::Status Write(const void* data, int64_t nBytes) override
+ {
+ if (ARROW_PREDICT_FALSE(!is_open_))
+ {
+ return arrow::Status::IOError(std::string(Tag) + ": " +
processor_->tag() +
+ ":
SharedRingBufferOutputStream is closed");
+ }
+ if (ARROW_PREDICT_TRUE(nBytes > 0))
+ {
+ auto buffer =
std::move(processor_->create_shared_ring_buffer());
+ size_t rest = static_cast<size_t>(nBytes);
+ while (true)
+ {
+ processor_->lock_acquire(LW_EXCLUSIVE);
+ auto writtenSize = buffer.write(data, rest);
+ processor_->lock_release();
+
+ position_ += writtenSize;
+ rest -= writtenSize;
+ data = static_cast<const uint8_t*>(data) +
writtenSize;
+
+ if (ARROW_PREDICT_TRUE(rest == 0))
+ {
+ break;
+ }
+
+ processor_->wait_server_read(&buffer);
+ }
+ }
+ return arrow::Status::OK();
+ }
+
+ using arrow::io::OutputStream::Write;
+
+ private:
+ Processor* processor_;
+ int64_t position_;
+ bool is_open_;
};
class WorkerProcessor : public Processor {
@@ -153,7 +538,9 @@ class WorkerProcessor : public Processor {
if (!found)
{
LWLockRelease(AddinShmemInitLock);
- elog(ERROR, "%s: %s: shared data isn't created yet",
Tag, tag_);
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: shared data isn't created yet",
Tag, tag_));
}
auto area = dsa_attach(sharedData->handle);
lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
@@ -180,78 +567,101 @@ class Executor : public WorkerProcessor {
unsetSharedString(sharedData_->connectData.databaseName);
unsetSharedString(sharedData_->connectData.userName);
unsetSharedString(sharedData_->connectData.password);
- // TODO: Customizable.
- sharedData_->executeData.buffer.total = 1L * 1024L;
- sharedData_->executeData.buffer.data =
- dsa_allocate(area_,
sharedData_->executeData.buffer.total);
- sharedData_->executeData.buffer.used = 0;
+ {
+ SharedRingBuffer
buffer(&(sharedData_->executeData.bufferData), area_);
+ // TODO: Customizable.
+ buffer.allocate(1L * 1024L * 1024L);
+ }
LWLockRelease(lock_);
StartTransactionCommand();
SPI_connect();
- PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_IDLE, NULL);
}
void close()
{
pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
closing").c_str());
- PopActiveSnapshot();
SPI_finish();
CommitTransactionCommand();
LWLockAcquire(lock_, LW_EXCLUSIVE);
- dsa_free(area_, sharedData_->executeData.buffer.data);
- sharedData_->executeData.buffer.data = InvalidDsaPointer;
- sharedData_->executeData.buffer.total = 0;
- sharedData_->executeData.buffer.used = 0;
+ {
+ SharedRingBuffer
buffer(&(sharedData_->executeData.bufferData), area_);
+ buffer.free();
+ }
sharedData_->executorPID = InvalidPid;
LWLockRelease(lock_);
pgstat_report_activity(STATE_IDLE, NULL);
}
+ void signaled()
+ {
+ P("%s: %s: signaled: before: %d", Tag, tag_,
sharedData_->executeData.query);
+ P("signaled: before: %d", sharedData_->executeData.query);
+ if (DsaPointerIsValid(sharedData_->executeData.query))
+ {
+ execute();
+ }
+ else
+ {
+ Processor::signaled();
+ }
+ P("%s: %s: signaled: after: %d", Tag, tag_,
sharedData_->executeData.query);
+ }
+
+ private:
+ void unsetSharedString(dsa_pointer& pointer)
+ {
+ if (!DsaPointerIsValid(pointer))
+ {
+ return;
+ }
+ dsa_free(area_, pointer);
+ pointer = InvalidDsaPointer;
+ }
+
void execute()
{
pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
executing").c_str());
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+
LWLockAcquire(lock_, LW_EXCLUSIVE);
auto query = static_cast<const char*>(
dsa_get_address(area_, sharedData_->executeData.query));
SetCurrentStatementStartTimestamp();
+ P("%s: %s: execute: %s", Tag, tag_, query);
auto result = SPI_execute(query, true, 0);
dsa_free(area_, sharedData_->executeData.query);
sharedData_->executeData.query = InvalidDsaPointer;
+ LWLockRelease(lock_);
+
if (result == SPI_OK_SELECT)
{
- auto bufferResult = write();
- if (bufferResult.ok())
+ pgstat_report_activity(STATE_RUNNING,
+ (std::string(Tag) + ":
writing").c_str());
+ auto status = write();
+ if (!status.ok())
{
- auto buffer = *bufferResult;
- auto output =
- dsa_get_address(area_,
sharedData_->executeData.buffer.data);
- memcpy(output, buffer->data(), buffer->size());
- sharedData_->executeData.buffer.used =
buffer->size();
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: failed to write", Tag,
tag_));
}
}
- LWLockRelease(lock_);
+
+ PopActiveSnapshot();
+
if (sharedData_->serverPID != InvalidPid)
{
+ P("%s: %s: kill server: %s", Tag, tag_,
sharedData_->serverPID);
kill(sharedData_->serverPID, SIGUSR1);
}
- pgstat_report_activity(STATE_IDLE, NULL);
- }
- private:
- void unsetSharedString(dsa_pointer& pointer)
- {
- if (!DsaPointerIsValid(pointer))
- {
- return;
- }
- dsa_free(area_, pointer);
- pointer = InvalidDsaPointer;
+ pgstat_report_activity(STATE_IDLE, NULL);
}
- arrow::Result<std::shared_ptr<arrow::Buffer>> write()
+ arrow::Status write()
{
- ARROW_ASSIGN_OR_RAISE(auto output,
arrow::io::BufferOutputStream::Create());
+ SharedRingBufferOutputStream output(this);
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
{
@@ -270,18 +680,48 @@ class Executor : public WorkerProcessor {
arrow::field(NameStr(attribute->attname), type,
!attribute->attnotnull));
}
auto schema = arrow::schema(fields);
- auto option = arrow::ipc::IpcWriteOptions::Defaults();
- option.emit_dictionary_deltas = true;
- ARROW_ASSIGN_OR_RAISE(auto writer,
- arrow::ipc::MakeStreamWriter(output,
schema, option));
ARROW_ASSIGN_OR_RAISE(
auto builder,
arrow::RecordBatchBuilder::Make(schema,
arrow::default_memory_pool()));
+ auto option = arrow::ipc::IpcWriteOptions::Defaults();
+ option.emit_dictionary_deltas = true;
+
+ // Write schema only stream format data to return only schema.
+ ARROW_ASSIGN_OR_RAISE(auto writer,
+ arrow::ipc::MakeStreamWriter(&output,
schema, option));
+ // Build 1 null row to write schema.
+ for (uint64_t iAttribute = 0; iAttribute <
SPI_tuptable->tupdesc->natts;
+ ++iAttribute)
+ {
+ auto arrayBuilder = builder->GetField(iAttribute);
+ ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
+ }
+ ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
+ P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
+ ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+ P("%s: %s: write: schema: Close", Tag, tag_);
+ ARROW_RETURN_NOT_OK(writer->Close());
+
+ // Write another stream format data with record batches.
+ ARROW_ASSIGN_OR_RAISE(writer,
+ arrow::ipc::MakeStreamWriter(&output,
schema, option));
for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
{
+ P("%s: %s: write: data: record batch: %d/%d",
+ Tag,
+ tag_,
+ iTuple,
+ SPI_processed);
for (uint64_t iAttribute = 0; iAttribute <
SPI_tuptable->tupdesc->natts;
++iAttribute)
{
+ P("%s: %s: write: data: record batch: %d/%d:
%d/%d",
+ Tag,
+ tag_,
+ iTuple,
+ SPI_processed,
+ iAttribute,
+ SPI_tuptable->tupdesc->natts);
bool isNull;
auto datum =
SPI_getbinval(SPI_tuptable->vals[iTuple],
SPI_tuptable->tupdesc,
@@ -300,10 +740,12 @@ class Executor : public WorkerProcessor {
}
}
}
- ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
+ ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+ P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+ P("%s: %s: write: data: Close", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->Close());
- return output->Finish();
+ return output.Close();
}
};
@@ -325,43 +767,50 @@ class Proxy : public WorkerProcessor {
setSharedString(sharedData_->connectData.password, password);
LWLockRelease(lock_);
kill(sharedData_->mainPID, SIGUSR1);
- std::unique_lock<std::mutex> lock(mutex_);
- condition_variable_.wait(lock,
- [&] { return sharedData_->executorPID
!= InvalidPid; });
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ conditionVariable_.wait(
+ lock, [&] { return sharedData_->executorPID !=
InvalidPid; });
+ }
return arrow::Status::OK();
}
- arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> execute(
- const std::string& query)
+ arrow::Result<std::shared_ptr<arrow::Schema>> execute(const
std::string& query)
{
LWLockAcquire(lock_, LW_EXCLUSIVE);
setSharedString(sharedData_->executeData.query, query);
LWLockRelease(lock_);
if (sharedData_->executorPID != InvalidPid)
{
+ P("%s: %s: execute: kill executor: %d", Tag, tag_,
sharedData_->executorPID);
kill(sharedData_->executorPID, SIGUSR1);
}
- std::unique_lock<std::mutex> lock(mutex_);
- condition_variable_.wait(
- lock, [&] { return sharedData_->executeData.buffer.used
!= 0; });
- return read();
+ P("%s: %s: execute: open", Tag, tag_);
+ auto input =
std::make_shared<SharedRingBufferInputStream>(this);
+ // Read schema only stream format data.
+ ARROW_ASSIGN_OR_RAISE(auto reader,
+
arrow::ipc::RecordBatchStreamReader::Open(input));
+ while (true)
+ {
+ std::shared_ptr<arrow::RecordBatch> recordBatch;
+ P("%s: %s: execute: read next", Tag, tag_);
+ ARROW_RETURN_NOT_OK(reader->ReadNext(&recordBatch));
+ if (!recordBatch)
+ {
+ break;
+ }
+ }
+ P("%s: %s: execute: schema", Tag, tag_);
+ return reader->schema();
}
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read()
{
- auto input = std::make_shared<arrow::io::BufferReader>(
- static_cast<const uint8_t*>(
- dsa_get_address(area_,
sharedData_->executeData.buffer.data)),
- sharedData_->executeData.buffer.used);
+ auto input =
std::make_shared<SharedRingBufferInputStream>(this);
+ // Read another stream format data with record batches.
return arrow::ipc::RecordBatchStreamReader::Open(input);
}
- void signaled()
- {
- std::lock_guard<std::mutex> lock(mutex_);
- condition_variable_.notify_all();
- }
-
private:
void setSharedString(dsa_pointer& pointer, const std::string& input)
{
@@ -372,9 +821,6 @@ class Proxy : public WorkerProcessor {
pointer = dsa_allocate(area_, input.size() + 1);
memcpy(dsa_get_address(area_, pointer), input.c_str(),
input.size() + 1);
}
-
- std::mutex mutex_;
- std::condition_variable condition_variable_;
};
class MainProcessor : public Processor {
@@ -388,7 +834,9 @@ class MainProcessor : public Processor {
if (found)
{
LWLockRelease(AddinShmemInitLock);
- elog(ERROR, "%s: %s: shared data is already created",
Tag, tag_);
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: shared data is already
created", Tag, tag_));
}
auto area = dsa_create(LWLockNewTrancheId());
sharedData->handle = dsa_get_handle(area);
@@ -398,9 +846,7 @@ class MainProcessor : public Processor {
sharedData->connectData.databaseName = InvalidDsaPointer;
sharedData->connectData.userName = InvalidDsaPointer;
sharedData->connectData.password = InvalidDsaPointer;
- sharedData->executeData.buffer.data = InvalidDsaPointer;
- sharedData->executeData.buffer.total = 0;
- sharedData->executeData.buffer.used = 0;
+
SharedRingBuffer::initialize_data(&(sharedData->executeData.bufferData));
lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
LWLockRelease(AddinShmemInitLock);
sharedData_ = sharedData;
@@ -422,7 +868,9 @@ class MainProcessor : public Processor {
BackgroundWorkerHandle* handle;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
- elog(ERROR, "%s: %s: failed to start server", Tag,
tag_);
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: failed to start server", Tag,
tag_));
}
WaitForBackgroundWorkerStartup(handle,
&(sharedData_->serverPID));
return handle;
@@ -449,7 +897,9 @@ class MainProcessor : public Processor {
BackgroundWorkerHandle* handle;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
- elog(ERROR, "%s: %s: failed to start executor", Tag,
tag_);
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: failed to start executor", Tag,
tag_));
}
WaitForBackgroundWorkerStartup(handle,
&(sharedData_->executorPID));
kill(sharedData_->serverPID, SIGUSR1);
@@ -514,8 +964,7 @@ class FlightSQLServer : public
arrow::flight::sql::FlightSqlServerBase {
const arrow::flight::FlightDescriptor& descriptor)
{
const auto& query = command.query;
- ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->execute(query));
- auto schema = reader->schema();
+ ARROW_ASSIGN_OR_RAISE(auto schema, proxy_->execute(query));
ARROW_ASSIGN_OR_RAISE(auto ticket,
arrow::flight::sql::CreateStatementQueryTicket(query));
std::vector<arrow::flight::FlightEndpoint> endpoints{
@@ -562,6 +1011,7 @@ afs_server_internal(Proxy* proxy)
CHECK_FOR_INTERRUPTS();
}
+ // TODO: Use before_shmem_exit()
auto deadline = std::chrono::system_clock::now() +
std::chrono::microseconds(10);
return flightSQLServer.Shutdown(&deadline);
}
@@ -586,7 +1036,7 @@ afs_executor(Datum arg)
{
events |= WL_TIMEOUT;
}
- int conditions = WaitLatch(MyLatch, events, timeout,
PG_WAIT_EXTENSION);
+ auto conditions = WaitLatch(MyLatch, events, timeout,
PG_WAIT_EXTENSION);
if (conditions & WL_TIMEOUT)
{
@@ -598,11 +1048,12 @@ afs_executor(Datum arg)
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
- executor.execute();
+ executor.signaled();
}
CHECK_FOR_INTERRUPTS();
}
+ // TODO: Use before_shmem_exit()
executor.close();
}
@@ -617,11 +1068,16 @@ afs_server(Datum arg)
BackgroundWorkerUnblockSignals();
{
- Proxy proxy;
- auto status = afs_server_internal(&proxy);
+ arrow::Status status;
+ {
+ Proxy proxy;
+ status = afs_server_internal(&proxy);
+ }
if (!status.ok())
{
- elog(ERROR, "%s: server: failed: %s", Tag,
status.ToString().c_str());
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: server: failed: %s", Tag,
status.ToString().c_str()));
}
}
@@ -651,7 +1107,6 @@ afs_main(Datum arg)
CHECK_FOR_INTERRUPTS();
}
-
WaitForBackgroundWorkerShutdown(serverHandle);
}