This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bbb79f  Add support for large response (#24)
7bbb79f is described below

commit 7bbb79f6a259a3e4d9f2953b9f76728f937890f4
Author: Sutou Kouhei <[email protected]>
AuthorDate: Wed Mar 1 08:38:02 2023 +0900

    Add support for large response (#24)
    
    Closes GH-21
---
 CMakeLists.txt |   2 +
 src/afs.cc     | 617 +++++++++++++++++++++++++++++++++++++++++++++++++--------
 2 files changed, 538 insertions(+), 81 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5e370af..0799936 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -85,6 +85,8 @@ find_package(ArrowFlightSql REQUIRED)
 
 add_library(arrow_flight_sql MODULE ${AFS_SOURCES})
 set_target_properties(arrow_flight_sql PROPERTIES PREFIX "")
+target_compile_definitions(arrow_flight_sql
+                           PRIVATE "$<$<CONFIG:Debug>:AFS_DEBUG>")
 target_link_libraries(arrow_flight_sql postgresql
                       ArrowFlightSql::arrow_flight_sql_shared)
 execute_process(
diff --git a/src/afs.cc b/src/afs.cc
index f070974..79597a2 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -51,6 +51,18 @@ extern "C"
 #include <condition_variable>
 #include <sstream>
 
+#ifdef __GNUC__
+#define AFS_FUNC __PRETTY_FUNCTION__
+#else
+#define AFS_FUNC __func__
+#endif
+
+#ifdef AFS_DEBUG
+#define P(...) ereport(DEBUG5, errmsg_internal(__VA_ARGS__))
+#else
+#define P(...)
+#endif
+
 extern "C"
 {
        PG_MODULE_MAGIC;
@@ -108,20 +120,183 @@ struct ConnectData {
        dsa_pointer password;
 };
 
-struct Buffer {
-       dsa_pointer data;
+struct SharedRingBufferData {
+       dsa_pointer pointer;
        size_t total;
-       size_t used;
+       size_t head;
+       size_t tail;
+};
+
+class SharedRingBuffer {
+   public:
+       static void initialize_data(SharedRingBufferData* data)
+       {
+               data->pointer = InvalidDsaPointer;
+               data->total = 0;
+               data->head = 0;
+               data->tail = 0;
+       }
+
+       SharedRingBuffer(SharedRingBufferData* data, dsa_area* area)
+               : data_(data), area_(area)
+       {
+       }
+
+       void allocate(size_t total)
+       {
+               data_->pointer = dsa_allocate(area_, total);
+               data_->total = total;
+               data_->head = 0;
+               data_->tail = 0;
+       }
+
+       void free()
+       {
+               dsa_free(area_, data_->pointer);
+               initialize_data(data_);
+       }
+
+       size_t size() const
+       {
+               if (data_->head <= data_->tail)
+               {
+                       return data_->tail - data_->head;
+               }
+               else
+               {
+                       return (data_->total - data_->head) + data_->tail;
+               }
+       }
+
+       size_t rest_size() const { return data_->total - size() - 1; }
+
+       size_t write(const void* data, size_t n)
+       {
+               P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head, 
data_->tail, n);
+               if (rest_size() == 0)
+               {
+                       P("%s: %s: after: no space: (%d:%d) %d:0",
+                         Tag,
+                         AFS_FUNC,
+                         data_->head,
+                         data_->tail,
+                         n);
+                       return 0;
+               }
+
+               size_t writtenSize = 0;
+               auto output = address();
+               if (data_->head <= data_->tail)
+               {
+                       auto restSize = data_->total - data_->tail;
+                       if (data_->head == 0)
+                       {
+                               restSize--;
+                       }
+                       const auto firstHalfWriteSize = std::min(n, restSize);
+                       P("%s: %s: first half: (%d:%d) %d:%d",
+                         Tag,
+                         AFS_FUNC,
+                         data_->head,
+                         data_->tail,
+                         n,
+                         firstHalfWriteSize);
+                       memcpy(output + data_->tail, data, firstHalfWriteSize);
+                       data_->tail = (data_->tail + firstHalfWriteSize) % 
data_->total;
+                       n -= firstHalfWriteSize;
+                       writtenSize += firstHalfWriteSize;
+               }
+               if (n > 0 && rest_size() > 0)
+               {
+                       const auto lastHalfWriteSize = std::min(n, data_->head 
- data_->tail - 1);
+                       P("%s: %s: last half: (%d:%d) %d:%d",
+                         Tag,
+                         AFS_FUNC,
+                         data_->head,
+                         data_->tail,
+                         n,
+                         lastHalfWriteSize);
+                       memcpy(output + data_->tail,
+                              static_cast<const uint8_t*>(data) + writtenSize,
+                              lastHalfWriteSize);
+                       data_->tail += lastHalfWriteSize;
+                       n -= lastHalfWriteSize;
+                       writtenSize += lastHalfWriteSize;
+               }
+               P("%s: %s: after: (%d:%d) %d:%d",
+                 Tag,
+                 AFS_FUNC,
+                 data_->head,
+                 data_->tail,
+                 n,
+                 writtenSize);
+               return writtenSize;
+       }
+
+       size_t read(size_t n, void* output)
+       {
+               P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head, 
data_->tail, n);
+               size_t readSize = 0;
+               const auto input = address();
+               if (data_->head > data_->tail)
+               {
+                       const auto firstHalfReadSize = std::min(n, data_->total 
- data_->head);
+                       P("%s: %s: first half: (%d:%d) %d:%d",
+                         Tag,
+                         AFS_FUNC,
+                         data_->head,
+                         data_->tail,
+                         n,
+                         firstHalfReadSize);
+                       memcpy(output, input + data_->head, firstHalfReadSize);
+                       data_->head = (data_->head + firstHalfReadSize) % 
data_->total;
+                       n -= firstHalfReadSize;
+                       readSize += firstHalfReadSize;
+               }
+               if (n > 0 && data_->head != data_->tail)
+               {
+                       const auto lastHalfReadSize = std::min(n, data_->tail - 
data_->head);
+                       P("%s: %s: last half: (%d:%d) %d:%d",
+                         Tag,
+                         AFS_FUNC,
+                         data_->head,
+                         data_->tail,
+                         n,
+                         lastHalfReadSize);
+                       memcpy(static_cast<uint8_t*>(output) + readSize,
+                              input + data_->head,
+                              lastHalfReadSize);
+                       data_->head += lastHalfReadSize;
+                       n -= lastHalfReadSize;
+                       readSize += lastHalfReadSize;
+               }
+               P("%s: %s: after: (%d:%d) %d:%d",
+                 Tag,
+                 AFS_FUNC,
+                 data_->head,
+                 data_->tail,
+                 n,
+                 readSize);
+               return readSize;
+       }
+
+   private:
+       SharedRingBufferData* data_;
+       dsa_area* area_;
+
+       uint8_t* address()
+       {
+               return static_cast<uint8_t*>(dsa_get_address(area_, 
data_->pointer));
+       }
 };
 
 struct ExecuteData {
        dsa_pointer query;
-       Buffer buffer;
+       SharedRingBufferData bufferData;
 };
 
 struct SharedData {
        dsa_handle handle;
-       LWLock* lock;
        pid_t executorPID;
        pid_t serverPID;
        pid_t mainPID;
@@ -131,15 +306,225 @@ struct SharedData {
 
 class Processor {
    public:
-       Processor(const char* tag) : tag_(tag), sharedData_(nullptr), 
area_(nullptr) {}
+       Processor(const char* tag)
+               : tag_(tag),
+                 sharedData_(nullptr),
+                 area_(nullptr),
+                 lock_(),
+                 mutex_(),
+                 conditionVariable_()
+       {
+       }
 
        virtual ~Processor() { dsa_detach(area_); }
 
+       const char* tag() { return tag_; }
+
+       SharedRingBuffer create_shared_ring_buffer()
+       {
+               return SharedRingBuffer(&(sharedData_->executeData.bufferData), 
area_);
+       }
+
+       void lock_acquire(LWLockMode mode) { LWLockAcquire(lock_, 
LW_EXCLUSIVE); }
+
+       void lock_release() { LWLockRelease(lock_); }
+
+       void wait_executor_written(SharedRingBuffer* buffer)
+       {
+               if (ARROW_PREDICT_FALSE(sharedData_->executorPID == InvalidPid))
+               {
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: executor isn't alive", Tag, 
tag_));
+               }
+
+               P("%s: %s: %s: kill executor: %d", Tag, tag_, AFS_FUNC, 
sharedData_->executorPID);
+               kill(sharedData_->executorPID, SIGUSR1);
+               auto size = buffer->size();
+               std::unique_lock<std::mutex> lock(mutex_);
+               conditionVariable_.wait(lock, [&] {
+                       P("%s: %s: %s: wait: write: %d:%d",
+                         Tag,
+                         tag_,
+                         AFS_FUNC,
+                         buffer->size(),
+                         size);
+                       return buffer->size() != size;
+               });
+       }
+
+       void wait_server_read(SharedRingBuffer* buffer)
+       {
+               if (ARROW_PREDICT_FALSE(sharedData_->serverPID == InvalidPid))
+               {
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: server isn't alive", Tag, 
tag_));
+               }
+
+               P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC, 
sharedData_->serverPID);
+               kill(sharedData_->serverPID, SIGUSR1);
+               auto restSize = buffer->rest_size();
+               while (true)
+               {
+                       int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+                       WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
+                       if (GotSIGTERM)
+                       {
+                               break;
+                       }
+                       ResetLatch(MyLatch);
+
+                       if (GotSIGUSR1)
+                       {
+                               GotSIGUSR1 = false;
+                               P("%s: %s: %s: wait: read: %d:%d",
+                                 Tag,
+                                 tag_,
+                                 AFS_FUNC,
+                                 buffer->rest_size(),
+                                 restSize);
+                               if (buffer->rest_size() != restSize)
+                               {
+                                       break;
+                               }
+                       }
+
+                       CHECK_FOR_INTERRUPTS();
+               }
+       }
+
+       void signaled()
+       {
+               P("%s: %s: signaled: before", Tag, tag_);
+               conditionVariable_.notify_all();
+               P("%s: %s: signaled: after", Tag, tag_);
+       }
+
    protected:
        const char* tag_;
        SharedData* sharedData_;
        dsa_area* area_;
        LWLock* lock_;
+       std::mutex mutex_;
+       std::condition_variable conditionVariable_;
+};
+
+class SharedRingBufferInputStream : public arrow::io::InputStream {
+   public:
+       SharedRingBufferInputStream(Processor* processor)
+               : arrow::io::InputStream(), processor_(processor), 
position_(0), is_open_(true)
+       {
+       }
+
+       arrow::Status Close() override
+       {
+               is_open_ = false;
+               return arrow::Status::OK();
+       }
+
+       bool closed() const override { return !is_open_; }
+
+       arrow::Result<int64_t> Tell() const override { return position_; }
+
+       arrow::Result<int64_t> Read(int64_t nBytes, void* out) override
+       {
+               if (ARROW_PREDICT_FALSE(!is_open_))
+               {
+                       return arrow::Status::IOError(std::string(Tag) + ": " + 
processor_->tag() +
+                                                     ": 
SharedRingBufferInputStream is closed");
+               }
+               auto buffer = 
std::move(processor_->create_shared_ring_buffer());
+               size_t rest = static_cast<size_t>(nBytes);
+               while (true)
+               {
+                       processor_->lock_acquire(LW_EXCLUSIVE);
+                       auto readBytes = buffer.read(rest, out);
+                       processor_->lock_release();
+
+                       position_ += readBytes;
+                       rest -= readBytes;
+                       out = static_cast<uint8_t*>(out) + readBytes;
+                       if (ARROW_PREDICT_TRUE(rest == 0))
+                       {
+                               break;
+                       }
+
+                       processor_->wait_executor_written(&buffer);
+               }
+               return nBytes;
+       }
+
+       arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nBytes) 
override
+       {
+               ARROW_ASSIGN_OR_RAISE(auto buffer, 
arrow::AllocateResizableBuffer(nBytes));
+               ARROW_ASSIGN_OR_RAISE(auto readBytes, Read(nBytes, 
buffer->mutable_data()));
+               ARROW_RETURN_NOT_OK(buffer->Resize(readBytes, false));
+               buffer->ZeroPadding();
+               return std::move(buffer);
+       }
+
+   private:
+       Processor* processor_;
+       int64_t position_;
+       bool is_open_;
+};
+
+class SharedRingBufferOutputStream : public arrow::io::OutputStream {
+   public:
+       SharedRingBufferOutputStream(Processor* processor)
+               : arrow::io::OutputStream(), processor_(processor), 
position_(0), is_open_(true)
+       {
+       }
+
+       arrow::Status Close() override
+       {
+               is_open_ = false;
+               return arrow::Status::OK();
+       }
+
+       bool closed() const override { return !is_open_; }
+
+       arrow::Result<int64_t> Tell() const override { return position_; }
+
+       arrow::Status Write(const void* data, int64_t nBytes) override
+       {
+               if (ARROW_PREDICT_FALSE(!is_open_))
+               {
+                       return arrow::Status::IOError(std::string(Tag) + ": " + 
processor_->tag() +
+                                                     ": 
SharedRingBufferOutputStream is closed");
+               }
+               if (ARROW_PREDICT_TRUE(nBytes > 0))
+               {
+                       auto buffer = 
std::move(processor_->create_shared_ring_buffer());
+                       size_t rest = static_cast<size_t>(nBytes);
+                       while (true)
+                       {
+                               processor_->lock_acquire(LW_EXCLUSIVE);
+                               auto writtenSize = buffer.write(data, rest);
+                               processor_->lock_release();
+
+                               position_ += writtenSize;
+                               rest -= writtenSize;
+                               data = static_cast<const uint8_t*>(data) + 
writtenSize;
+
+                               if (ARROW_PREDICT_TRUE(rest == 0))
+                               {
+                                       break;
+                               }
+
+                               processor_->wait_server_read(&buffer);
+                       }
+               }
+               return arrow::Status::OK();
+       }
+
+       using arrow::io::OutputStream::Write;
+
+   private:
+       Processor* processor_;
+       int64_t position_;
+       bool is_open_;
 };
 
 class WorkerProcessor : public Processor {
@@ -153,7 +538,9 @@ class WorkerProcessor : public Processor {
                if (!found)
                {
                        LWLockRelease(AddinShmemInitLock);
-                       elog(ERROR, "%s: %s: shared data isn't created yet", 
Tag, tag_);
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: shared data isn't created yet", 
Tag, tag_));
                }
                auto area = dsa_attach(sharedData->handle);
                lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
@@ -180,78 +567,101 @@ class Executor : public WorkerProcessor {
                unsetSharedString(sharedData_->connectData.databaseName);
                unsetSharedString(sharedData_->connectData.userName);
                unsetSharedString(sharedData_->connectData.password);
-               // TODO: Customizable.
-               sharedData_->executeData.buffer.total = 1L * 1024L;
-               sharedData_->executeData.buffer.data =
-                       dsa_allocate(area_, 
sharedData_->executeData.buffer.total);
-               sharedData_->executeData.buffer.used = 0;
+               {
+                       SharedRingBuffer 
buffer(&(sharedData_->executeData.bufferData), area_);
+                       // TODO: Customizable.
+                       buffer.allocate(1L * 1024L * 1024L);
+               }
                LWLockRelease(lock_);
                StartTransactionCommand();
                SPI_connect();
-               PushActiveSnapshot(GetTransactionSnapshot());
                pgstat_report_activity(STATE_IDLE, NULL);
        }
 
        void close()
        {
                pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
closing").c_str());
-               PopActiveSnapshot();
                SPI_finish();
                CommitTransactionCommand();
                LWLockAcquire(lock_, LW_EXCLUSIVE);
-               dsa_free(area_, sharedData_->executeData.buffer.data);
-               sharedData_->executeData.buffer.data = InvalidDsaPointer;
-               sharedData_->executeData.buffer.total = 0;
-               sharedData_->executeData.buffer.used = 0;
+               {
+                       SharedRingBuffer 
buffer(&(sharedData_->executeData.bufferData), area_);
+                       buffer.free();
+               }
                sharedData_->executorPID = InvalidPid;
                LWLockRelease(lock_);
                pgstat_report_activity(STATE_IDLE, NULL);
        }
 
+       void signaled()
+       {
+               P("%s: %s: signaled: before: %d", Tag, tag_, 
sharedData_->executeData.query);
+               P("signaled: before: %d", sharedData_->executeData.query);
+               if (DsaPointerIsValid(sharedData_->executeData.query))
+               {
+                       execute();
+               }
+               else
+               {
+                       Processor::signaled();
+               }
+               P("%s: %s: signaled: after: %d", Tag, tag_, 
sharedData_->executeData.query);
+       }
+
+   private:
+       void unsetSharedString(dsa_pointer& pointer)
+       {
+               if (!DsaPointerIsValid(pointer))
+               {
+                       return;
+               }
+               dsa_free(area_, pointer);
+               pointer = InvalidDsaPointer;
+       }
+
        void execute()
        {
                pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
executing").c_str());
+
+               PushActiveSnapshot(GetTransactionSnapshot());
+
                LWLockAcquire(lock_, LW_EXCLUSIVE);
                auto query = static_cast<const char*>(
                        dsa_get_address(area_, sharedData_->executeData.query));
                SetCurrentStatementStartTimestamp();
+               P("%s: %s: execute: %s", Tag, tag_, query);
                auto result = SPI_execute(query, true, 0);
                dsa_free(area_, sharedData_->executeData.query);
                sharedData_->executeData.query = InvalidDsaPointer;
+               LWLockRelease(lock_);
+
                if (result == SPI_OK_SELECT)
                {
-                       auto bufferResult = write();
-                       if (bufferResult.ok())
+                       pgstat_report_activity(STATE_RUNNING,
+                                              (std::string(Tag) + ": 
writing").c_str());
+                       auto status = write();
+                       if (!status.ok())
                        {
-                               auto buffer = *bufferResult;
-                               auto output =
-                                       dsa_get_address(area_, 
sharedData_->executeData.buffer.data);
-                               memcpy(output, buffer->data(), buffer->size());
-                               sharedData_->executeData.buffer.used = 
buffer->size();
+                               ereport(ERROR,
+                                       errcode(ERRCODE_INTERNAL_ERROR),
+                                       errmsg("%s: %s: failed to write", Tag, 
tag_));
                        }
                }
-               LWLockRelease(lock_);
+
+               PopActiveSnapshot();
+
                if (sharedData_->serverPID != InvalidPid)
                {
+                       P("%s: %s: kill server: %s", Tag, tag_, 
sharedData_->serverPID);
                        kill(sharedData_->serverPID, SIGUSR1);
                }
-               pgstat_report_activity(STATE_IDLE, NULL);
-       }
 
-   private:
-       void unsetSharedString(dsa_pointer& pointer)
-       {
-               if (!DsaPointerIsValid(pointer))
-               {
-                       return;
-               }
-               dsa_free(area_, pointer);
-               pointer = InvalidDsaPointer;
+               pgstat_report_activity(STATE_IDLE, NULL);
        }
 
-       arrow::Result<std::shared_ptr<arrow::Buffer>> write()
+       arrow::Status write()
        {
-               ARROW_ASSIGN_OR_RAISE(auto output, 
arrow::io::BufferOutputStream::Create());
+               SharedRingBufferOutputStream output(this);
                std::vector<std::shared_ptr<arrow::Field>> fields;
                for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
                {
@@ -270,18 +680,48 @@ class Executor : public WorkerProcessor {
                                arrow::field(NameStr(attribute->attname), type, 
!attribute->attnotnull));
                }
                auto schema = arrow::schema(fields);
-               auto option = arrow::ipc::IpcWriteOptions::Defaults();
-               option.emit_dictionary_deltas = true;
-               ARROW_ASSIGN_OR_RAISE(auto writer,
-                                     arrow::ipc::MakeStreamWriter(output, 
schema, option));
                ARROW_ASSIGN_OR_RAISE(
                        auto builder,
                        arrow::RecordBatchBuilder::Make(schema, 
arrow::default_memory_pool()));
+               auto option = arrow::ipc::IpcWriteOptions::Defaults();
+               option.emit_dictionary_deltas = true;
+
+               // Write schema only stream format data to return only schema.
+               ARROW_ASSIGN_OR_RAISE(auto writer,
+                                     arrow::ipc::MakeStreamWriter(&output, 
schema, option));
+               // Build 1 null row to write schema.
+               for (uint64_t iAttribute = 0; iAttribute < 
SPI_tuptable->tupdesc->natts;
+                    ++iAttribute)
+               {
+                       auto arrayBuilder = builder->GetField(iAttribute);
+                       ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
+               }
+               ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
+               P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
+               ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+               P("%s: %s: write: schema: Close", Tag, tag_);
+               ARROW_RETURN_NOT_OK(writer->Close());
+
+               // Write another stream format data with record batches.
+               ARROW_ASSIGN_OR_RAISE(writer,
+                                     arrow::ipc::MakeStreamWriter(&output, 
schema, option));
                for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
                {
+                       P("%s: %s: write: data: record batch: %d/%d",
+                         Tag,
+                         tag_,
+                         iTuple,
+                         SPI_processed);
                        for (uint64_t iAttribute = 0; iAttribute < 
SPI_tuptable->tupdesc->natts;
                             ++iAttribute)
                        {
+                               P("%s: %s: write: data: record batch: %d/%d: 
%d/%d",
+                                 Tag,
+                                 tag_,
+                                 iTuple,
+                                 SPI_processed,
+                                 iAttribute,
+                                 SPI_tuptable->tupdesc->natts);
                                bool isNull;
                                auto datum = 
SPI_getbinval(SPI_tuptable->vals[iTuple],
                                                           
SPI_tuptable->tupdesc,
@@ -300,10 +740,12 @@ class Executor : public WorkerProcessor {
                                }
                        }
                }
-               ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
+               ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+               P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
                ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+               P("%s: %s: write: data: Close", Tag, tag_);
                ARROW_RETURN_NOT_OK(writer->Close());
-               return output->Finish();
+               return output.Close();
        }
 };
 
@@ -325,43 +767,50 @@ class Proxy : public WorkerProcessor {
                setSharedString(sharedData_->connectData.password, password);
                LWLockRelease(lock_);
                kill(sharedData_->mainPID, SIGUSR1);
-               std::unique_lock<std::mutex> lock(mutex_);
-               condition_variable_.wait(lock,
-                                        [&] { return sharedData_->executorPID 
!= InvalidPid; });
+               {
+                       std::unique_lock<std::mutex> lock(mutex_);
+                       conditionVariable_.wait(
+                               lock, [&] { return sharedData_->executorPID != 
InvalidPid; });
+               }
                return arrow::Status::OK();
        }
 
-       arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> execute(
-               const std::string& query)
+       arrow::Result<std::shared_ptr<arrow::Schema>> execute(const 
std::string& query)
        {
                LWLockAcquire(lock_, LW_EXCLUSIVE);
                setSharedString(sharedData_->executeData.query, query);
                LWLockRelease(lock_);
                if (sharedData_->executorPID != InvalidPid)
                {
+                       P("%s: %s: execute: kill executor: %d", Tag, tag_, 
sharedData_->executorPID);
                        kill(sharedData_->executorPID, SIGUSR1);
                }
-               std::unique_lock<std::mutex> lock(mutex_);
-               condition_variable_.wait(
-                       lock, [&] { return sharedData_->executeData.buffer.used 
!= 0; });
-               return read();
+               P("%s: %s: execute: open", Tag, tag_);
+               auto input = 
std::make_shared<SharedRingBufferInputStream>(this);
+               // Read schema only stream format data.
+               ARROW_ASSIGN_OR_RAISE(auto reader,
+                                     
arrow::ipc::RecordBatchStreamReader::Open(input));
+               while (true)
+               {
+                       std::shared_ptr<arrow::RecordBatch> recordBatch;
+                       P("%s: %s: execute: read next", Tag, tag_);
+                       ARROW_RETURN_NOT_OK(reader->ReadNext(&recordBatch));
+                       if (!recordBatch)
+                       {
+                               break;
+                       }
+               }
+               P("%s: %s: execute: schema", Tag, tag_);
+               return reader->schema();
        }
 
        arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read()
        {
-               auto input = std::make_shared<arrow::io::BufferReader>(
-                       static_cast<const uint8_t*>(
-                               dsa_get_address(area_, 
sharedData_->executeData.buffer.data)),
-                       sharedData_->executeData.buffer.used);
+               auto input = 
std::make_shared<SharedRingBufferInputStream>(this);
+               // Read another stream format data with record batches.
                return arrow::ipc::RecordBatchStreamReader::Open(input);
        }
 
-       void signaled()
-       {
-               std::lock_guard<std::mutex> lock(mutex_);
-               condition_variable_.notify_all();
-       }
-
    private:
        void setSharedString(dsa_pointer& pointer, const std::string& input)
        {
@@ -372,9 +821,6 @@ class Proxy : public WorkerProcessor {
                pointer = dsa_allocate(area_, input.size() + 1);
                memcpy(dsa_get_address(area_, pointer), input.c_str(), 
input.size() + 1);
        }
-
-       std::mutex mutex_;
-       std::condition_variable condition_variable_;
 };
 
 class MainProcessor : public Processor {
@@ -388,7 +834,9 @@ class MainProcessor : public Processor {
                if (found)
                {
                        LWLockRelease(AddinShmemInitLock);
-                       elog(ERROR, "%s: %s: shared data is already created", 
Tag, tag_);
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: shared data is already 
created", Tag, tag_));
                }
                auto area = dsa_create(LWLockNewTrancheId());
                sharedData->handle = dsa_get_handle(area);
@@ -398,9 +846,7 @@ class MainProcessor : public Processor {
                sharedData->connectData.databaseName = InvalidDsaPointer;
                sharedData->connectData.userName = InvalidDsaPointer;
                sharedData->connectData.password = InvalidDsaPointer;
-               sharedData->executeData.buffer.data = InvalidDsaPointer;
-               sharedData->executeData.buffer.total = 0;
-               sharedData->executeData.buffer.used = 0;
+               
SharedRingBuffer::initialize_data(&(sharedData->executeData.bufferData));
                lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
                LWLockRelease(AddinShmemInitLock);
                sharedData_ = sharedData;
@@ -422,7 +868,9 @@ class MainProcessor : public Processor {
                BackgroundWorkerHandle* handle;
                if (!RegisterDynamicBackgroundWorker(&worker, &handle))
                {
-                       elog(ERROR, "%s: %s: failed to start server", Tag, 
tag_);
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: failed to start server", Tag, 
tag_));
                }
                WaitForBackgroundWorkerStartup(handle, 
&(sharedData_->serverPID));
                return handle;
@@ -449,7 +897,9 @@ class MainProcessor : public Processor {
                BackgroundWorkerHandle* handle;
                if (!RegisterDynamicBackgroundWorker(&worker, &handle))
                {
-                       elog(ERROR, "%s: %s: failed to start executor", Tag, 
tag_);
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: failed to start executor", Tag, 
tag_));
                }
                WaitForBackgroundWorkerStartup(handle, 
&(sharedData_->executorPID));
                kill(sharedData_->serverPID, SIGUSR1);
@@ -514,8 +964,7 @@ class FlightSQLServer : public 
arrow::flight::sql::FlightSqlServerBase {
                const arrow::flight::FlightDescriptor& descriptor)
        {
                const auto& query = command.query;
-               ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->execute(query));
-               auto schema = reader->schema();
+               ARROW_ASSIGN_OR_RAISE(auto schema, proxy_->execute(query));
                ARROW_ASSIGN_OR_RAISE(auto ticket,
                                      
arrow::flight::sql::CreateStatementQueryTicket(query));
                std::vector<arrow::flight::FlightEndpoint> endpoints{
@@ -562,6 +1011,7 @@ afs_server_internal(Proxy* proxy)
                CHECK_FOR_INTERRUPTS();
        }
 
+       // TODO: Use before_shmem_exit()
        auto deadline = std::chrono::system_clock::now() + 
std::chrono::microseconds(10);
        return flightSQLServer.Shutdown(&deadline);
 }
@@ -586,7 +1036,7 @@ afs_executor(Datum arg)
                        {
                                events |= WL_TIMEOUT;
                        }
-                       int conditions = WaitLatch(MyLatch, events, timeout, 
PG_WAIT_EXTENSION);
+                       auto conditions = WaitLatch(MyLatch, events, timeout, 
PG_WAIT_EXTENSION);
 
                        if (conditions & WL_TIMEOUT)
                        {
@@ -598,11 +1048,12 @@ afs_executor(Datum arg)
                        if (GotSIGUSR1)
                        {
                                GotSIGUSR1 = false;
-                               executor.execute();
+                               executor.signaled();
                        }
 
                        CHECK_FOR_INTERRUPTS();
                }
+               // TODO: Use before_shmem_exit()
                executor.close();
        }
 
@@ -617,11 +1068,16 @@ afs_server(Datum arg)
        BackgroundWorkerUnblockSignals();
 
        {
-               Proxy proxy;
-               auto status = afs_server_internal(&proxy);
+               arrow::Status status;
+               {
+                       Proxy proxy;
+                       status = afs_server_internal(&proxy);
+               }
                if (!status.ok())
                {
-                       elog(ERROR, "%s: server: failed: %s", Tag, 
status.ToString().c_str());
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: server: failed: %s", Tag, 
status.ToString().c_str()));
                }
        }
 
@@ -651,7 +1107,6 @@ afs_main(Datum arg)
 
                        CHECK_FOR_INTERRUPTS();
                }
-
                WaitForBackgroundWorkerShutdown(serverHandle);
        }
 

Reply via email to