This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git


The following commit(s) were added to refs/heads/main by this push:
     new a5112a4  Add support for concurrent session (#37)
a5112a4 is described below

commit a5112a4f23793e04fbb598e14bbb06846cd39212
Author: Sutou Kouhei <[email protected]>
AuthorDate: Fri May 5 12:57:43 2023 +0900

    Add support for concurrent session (#37)
    
    Closes GH-20
---
 benchmark/integer/select-adbc-flight-sql.rb |   6 +-
 benchmark/integer/select-adbc-postgresql.rb |   4 +-
 benchmark/integer/select.rb                 |   7 +-
 src/afs.cc                                  | 906 ++++++++++++++++++----------
 test/test-flight-sql.rb                     |  23 +-
 5 files changed, 628 insertions(+), 318 deletions(-)

diff --git a/benchmark/integer/select-adbc-flight-sql.rb 
b/benchmark/integer/select-adbc-flight-sql.rb
index 6155ca2..7b8ac3b 100755
--- a/benchmark/integer/select-adbc-flight-sql.rb
+++ b/benchmark/integer/select-adbc-flight-sql.rb
@@ -24,14 +24,16 @@ require "adbc"
 options = {
   "driver" => "adbc_driver_flightsql",
   "uri" => "grpc://127.0.0.1:15432",
+  "username" => ENV["PGUSER"] || ENV["USER"],
+  "password" => ENV["PGPASSWORD"] || "",
   "adbc.flight.sql.rpc.call_header.x-flight-sql-database" => "afs_benchmark",
 }
 ADBC::Database.open(**options) do |database|
   database.connect do |connection|
     connection.open_statement do |statement|
       before = Time.now
-      table, n_rows_affected = statement.query("SELECT * FROM data")
-      # p table
+      _table, _n_rows_affected = statement.query("SELECT * FROM data")
+      # p _table
       puts("%.3fsec" % (Time.now - before))
     end
   end
diff --git a/benchmark/integer/select-adbc-postgresql.rb 
b/benchmark/integer/select-adbc-postgresql.rb
index 1fba137..f78f1c8 100755
--- a/benchmark/integer/select-adbc-postgresql.rb
+++ b/benchmark/integer/select-adbc-postgresql.rb
@@ -29,8 +29,8 @@ ADBC::Database.open(**options) do |database|
   database.connect do |connection|
     connection.open_statement do |statement|
       before = Time.now
-      table, n_rows_affected = statement.query("SELECT * FROM data")
-      # p table
+      _table, _n_rows_affected = statement.query("SELECT * FROM data")
+      # p _table
       puts("%.3fsec" % (Time.now - before))
     end
   end
diff --git a/benchmark/integer/select.rb b/benchmark/integer/select.rb
index bc2898c..f676e42 100755
--- a/benchmark/integer/select.rb
+++ b/benchmark/integer/select.rb
@@ -24,12 +24,15 @@ require "arrow-flight-sql"
 call_options = ArrowFlight::CallOptions.new
 call_options.add_header("x-flight-sql-database", "afs_benchmark")
 client = ArrowFlight::Client.new("grpc://127.0.0.1:15432")
+client.authenticate_basic(ENV["PGUSER"] || ENV["USER"],
+                          ENV["PGPASSWORD"] || "",
+                          call_options)
 sql_client = ArrowFlightSQL::Client.new(client)
 
 before = Time.now
 info = sql_client.execute("SELECT * FROM data", call_options)
 endpoint = info.endpoints.first
 reader = sql_client.do_get(endpoint.ticket, call_options)
-table = reader.read_all
-# p table
+_table = reader.read_all
+# p _table
 puts("%.3fsec" % (Time.now - before))
diff --git a/src/afs.cc b/src/afs.cc
index 12798c8..b33546f 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -22,6 +22,7 @@ extern "C"
 #include <access/xact.h>
 #include <executor/spi.h>
 #include <fmgr.h>
+#include <lib/dshash.h>
 #include <miscadmin.h>
 #include <postmaster/bgworker.h>
 #include <storage/ipc.h>
@@ -48,7 +49,9 @@ extern "C"
 #include <arrow/table_builder.h>
 #include <arrow/util/base64.h>
 
+#include <cinttypes>
 #include <condition_variable>
+#include <random>
 #include <sstream>
 
 #ifdef __GNUC__
@@ -126,12 +129,6 @@ afs_shmem_request_hook(void)
        RequestNamedLWLockTranche(LWLockTrancheName, 1);
 }
 
-struct ConnectData {
-       dsa_pointer databaseName;
-       dsa_pointer userName;
-       dsa_pointer password;
-};
-
 struct SharedRingBufferData {
        dsa_pointer pointer;
        size_t total;
@@ -149,6 +146,13 @@ class SharedRingBuffer {
                data->tail = 0;
        }
 
+       static void free_data(SharedRingBufferData* data, dsa_area* area)
+       {
+               if (data->pointer != InvalidDsaPointer)
+                       dsa_free(area, data->pointer);
+               initialize_data(data);
+       }
+
        SharedRingBuffer(SharedRingBufferData* data, dsa_area* area)
                : data_(data), area_(area)
        {
@@ -162,11 +166,7 @@ class SharedRingBuffer {
                data_->tail = 0;
        }
 
-       void free()
-       {
-               dsa_free(area_, data_->pointer);
-               initialize_data(data_);
-       }
+       void free() { free_data(data_, area_); }
 
        size_t size() const
        {
@@ -302,18 +302,47 @@ class SharedRingBuffer {
        }
 };
 
-struct ExecuteData {
+struct SessionData {
+       uint64_t id;
+       arrow::Status status;
+       pid_t executorPID;
+       bool initialized;
+       dsa_pointer databaseName;
+       dsa_pointer userName;
+       dsa_pointer password;
        dsa_pointer query;
        SharedRingBufferData bufferData;
 };
 
+class SessionReleaser {
+   public:
+       explicit SessionReleaser(dshash_table* sessions, SessionData* data)
+               : sessions_(sessions), data_(data)
+       {
+       }
+
+       ~SessionReleaser() { dshash_release_lock(sessions_, data_); }
+
+   private:
+       dshash_table* sessions_;
+       SessionData* data_;
+};
+
+static dshash_parameters SessionsParams = {
+       sizeof(uint32_t),
+       sizeof(SessionData),
+       dshash_memcmp,
+       dshash_memhash,
+       0,  // Set later because this is determined dynamically.
+};
+
 struct SharedData {
+       int trancheID;
        dsa_handle handle;
-       pid_t executorPID;
+       int sessionsTrancheID;
+       dshash_table_handle sessionsHandle;
        pid_t serverPID;
        pid_t mainPID;
-       ConnectData connectData;
-       ExecuteData executeData;
 };
 
 class Processor {
@@ -328,84 +357,18 @@ class Processor {
        {
        }
 
-       virtual ~Processor() { dsa_detach(area_); }
-
-       const char* tag() { return tag_; }
-
-       SharedRingBuffer create_shared_ring_buffer()
+       virtual ~Processor()
        {
-               return SharedRingBuffer(&(sharedData_->executeData.bufferData), 
area_);
+               if (area_)
+                       dsa_detach(area_);
        }
 
+       const char* tag() { return tag_; }
+
        void lock_acquire(LWLockMode mode) { LWLockAcquire(lock_, 
LW_EXCLUSIVE); }
 
        void lock_release() { LWLockRelease(lock_); }
 
-       void wait_executor_written(SharedRingBuffer* buffer)
-       {
-               if (ARROW_PREDICT_FALSE(sharedData_->executorPID == InvalidPid))
-               {
-                       ereport(ERROR,
-                               errcode(ERRCODE_INTERNAL_ERROR),
-                               errmsg("%s: %s: executor isn't alive", Tag, 
tag_));
-               }
-
-               P("%s: %s: %s: kill executor: %d", Tag, tag_, AFS_FUNC, 
sharedData_->executorPID);
-               kill(sharedData_->executorPID, SIGUSR1);
-               auto size = buffer->size();
-               std::unique_lock<std::mutex> lock(mutex_);
-               conditionVariable_.wait(lock, [&] {
-                       P("%s: %s: %s: wait: write: %d:%d",
-                         Tag,
-                         tag_,
-                         AFS_FUNC,
-                         buffer->size(),
-                         size);
-                       return buffer->size() != size;
-               });
-       }
-
-       void wait_server_read(SharedRingBuffer* buffer)
-       {
-               if (ARROW_PREDICT_FALSE(sharedData_->serverPID == InvalidPid))
-               {
-                       ereport(ERROR,
-                               errcode(ERRCODE_INTERNAL_ERROR),
-                               errmsg("%s: %s: server isn't alive", Tag, 
tag_));
-               }
-
-               P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC, 
sharedData_->serverPID);
-               kill(sharedData_->serverPID, SIGUSR1);
-               auto restSize = buffer->rest_size();
-               while (true)
-               {
-                       int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
-                       WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
-                       if (GotSIGTERM)
-                       {
-                               break;
-                       }
-                       ResetLatch(MyLatch);
-
-                       if (GotSIGUSR1)
-                       {
-                               GotSIGUSR1 = false;
-                               P("%s: %s: %s: wait: read: %d:%d",
-                                 Tag,
-                                 tag_,
-                                 AFS_FUNC,
-                                 buffer->rest_size(),
-                                 restSize);
-                               if (buffer->rest_size() != restSize)
-                               {
-                                       break;
-                               }
-                       }
-
-                       CHECK_FOR_INTERRUPTS();
-               }
-       }
-
        void signaled()
        {
                P("%s: %s: signaled: before", Tag, tag_);
@@ -422,10 +385,15 @@ class Processor {
        std::condition_variable conditionVariable_;
 };
 
+class Proxy;
 class SharedRingBufferInputStream : public arrow::io::InputStream {
    public:
-       SharedRingBufferInputStream(Processor* processor)
-               : arrow::io::InputStream(), processor_(processor), 
position_(0), is_open_(true)
+       SharedRingBufferInputStream(Proxy* proxy, SessionData* session)
+               : arrow::io::InputStream(),
+                 proxy_(proxy),
+                 session_(session),
+                 position_(0),
+                 is_open_(true)
        {
        }
 
@@ -439,33 +407,7 @@ class SharedRingBufferInputStream : public 
arrow::io::InputStream {
 
        arrow::Result<int64_t> Tell() const override { return position_; }
 
-       arrow::Result<int64_t> Read(int64_t nBytes, void* out) override
-       {
-               if (ARROW_PREDICT_FALSE(!is_open_))
-               {
-                       return arrow::Status::IOError(std::string(Tag) + ": " + 
processor_->tag() +
-                                                     ": 
SharedRingBufferInputStream is closed");
-               }
-               auto buffer = 
std::move(processor_->create_shared_ring_buffer());
-               size_t rest = static_cast<size_t>(nBytes);
-               while (true)
-               {
-                       processor_->lock_acquire(LW_EXCLUSIVE);
-                       auto readBytes = buffer.read(rest, out);
-                       processor_->lock_release();
-
-                       position_ += readBytes;
-                       rest -= readBytes;
-                       out = static_cast<uint8_t*>(out) + readBytes;
-                       if (ARROW_PREDICT_TRUE(rest == 0))
-                       {
-                               break;
-                       }
-
-                       processor_->wait_executor_written(&buffer);
-               }
-               return nBytes;
-       }
+       arrow::Result<int64_t> Read(int64_t nBytes, void* out) override;
 
        arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nBytes) 
override
        {
@@ -477,15 +419,17 @@ class SharedRingBufferInputStream : public 
arrow::io::InputStream {
        }
 
    private:
-       Processor* processor_;
+       Proxy* proxy_;
+       SessionData* session_;
        int64_t position_;
        bool is_open_;
 };
 
+class Executor;
 class SharedRingBufferOutputStream : public arrow::io::OutputStream {
    public:
-       SharedRingBufferOutputStream(Processor* processor)
-               : arrow::io::OutputStream(), processor_(processor), 
position_(0), is_open_(true)
+       SharedRingBufferOutputStream(Executor* executor)
+               : arrow::io::OutputStream(), executor_(executor), position_(0), 
is_open_(true)
        {
        }
 
@@ -499,54 +443,24 @@ class SharedRingBufferOutputStream : public 
arrow::io::OutputStream {
 
        arrow::Result<int64_t> Tell() const override { return position_; }
 
-       arrow::Status Write(const void* data, int64_t nBytes) override
-       {
-               if (ARROW_PREDICT_FALSE(!is_open_))
-               {
-                       return arrow::Status::IOError(std::string(Tag) + ": " + 
processor_->tag() +
-                                                     ": 
SharedRingBufferOutputStream is closed");
-               }
-               if (ARROW_PREDICT_TRUE(nBytes > 0))
-               {
-                       auto buffer = 
std::move(processor_->create_shared_ring_buffer());
-                       size_t rest = static_cast<size_t>(nBytes);
-                       while (true)
-                       {
-                               processor_->lock_acquire(LW_EXCLUSIVE);
-                               auto writtenSize = buffer.write(data, rest);
-                               processor_->lock_release();
-
-                               position_ += writtenSize;
-                               rest -= writtenSize;
-                               data = static_cast<const uint8_t*>(data) + 
writtenSize;
-
-                               if (ARROW_PREDICT_TRUE(rest == 0))
-                               {
-                                       break;
-                               }
-
-                               processor_->wait_server_read(&buffer);
-                       }
-               }
-               return arrow::Status::OK();
-       }
+       arrow::Status Write(const void* data, int64_t nBytes) override;
 
        using arrow::io::OutputStream::Write;
 
    private:
-       Processor* processor_;
+       Executor* executor_;
        int64_t position_;
        bool is_open_;
 };
 
 class WorkerProcessor : public Processor {
    public:
-       explicit WorkerProcessor(const char* tag) : Processor(tag)
+       explicit WorkerProcessor(const char* tag) : Processor(tag), 
sessions_(nullptr)
        {
                LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
                bool found;
-               auto sharedData = static_cast<SharedData*>(
-                       ShmemInitStruct(SharedDataName, sizeof(SharedData), 
&found));
+               sharedData_ = static_cast<SharedData*>(
+                       ShmemInitStruct(SharedDataName, sizeof(sharedData_), 
&found));
                if (!found)
                {
                        LWLockRelease(AddinShmemInitLock);
@@ -554,83 +468,156 @@ class WorkerProcessor : public Processor {
                                errcode(ERRCODE_INTERNAL_ERROR),
                                errmsg("%s: %s: shared data isn't created yet", 
Tag, tag_));
                }
-               auto area = dsa_attach(sharedData->handle);
+               area_ = dsa_attach(sharedData_->handle);
+               SessionsParams.tranche_id = sharedData_->sessionsTrancheID;
+               sessions_ =
+                       dshash_attach(area_, &SessionsParams, 
sharedData_->sessionsHandle, nullptr);
                lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
                LWLockRelease(AddinShmemInitLock);
-               sharedData_ = sharedData;
-               area_ = area;
        }
+
+       ~WorkerProcessor() override { dshash_detach(sessions_); }
+
+   protected:
+       void delete_session(SessionData* session)
+       {
+               session->status.~Status();
+               if (DsaPointerIsValid(session->databaseName))
+                       dsa_free(area_, session->databaseName);
+               if (DsaPointerIsValid(session->userName))
+                       dsa_free(area_, session->userName);
+               if (DsaPointerIsValid(session->password))
+                       dsa_free(area_, session->password);
+               if (DsaPointerIsValid(session->query))
+                       dsa_free(area_, session->query);
+               SharedRingBuffer::free_data(&(session->bufferData), area_);
+               dshash_delete_entry(sessions_, session);
+       }
+
+   protected:
+       dshash_table* sessions_;
 };
 
 class Executor : public WorkerProcessor {
    public:
-       explicit Executor() : WorkerProcessor("executor") {}
+       explicit Executor(uint64_t sessionID)
+               : WorkerProcessor("executor"),
+                 sessionID_(sessionID),
+                 session_(nullptr),
+                 connected_(false)
+       {
+       }
 
        void open()
        {
                pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
opening").c_str());
-               LWLockAcquire(lock_, LW_EXCLUSIVE);
-               BackgroundWorkerInitializeConnection(
-                       static_cast<const char*>(
-                               dsa_get_address(area_, 
sharedData_->connectData.databaseName)),
-                       static_cast<const char*>(
-                               dsa_get_address(area_, 
sharedData_->connectData.userName)),
-                       0);
-               unsetSharedString(sharedData_->connectData.databaseName);
-               unsetSharedString(sharedData_->connectData.userName);
-               unsetSharedString(sharedData_->connectData.password);
+               session_ = static_cast<SessionData*>(dshash_find(sessions_, 
&sessionID_, false));
+               auto databaseName =
+                       static_cast<const char*>(dsa_get_address(area_, 
session_->databaseName));
+               auto userName =
+                       static_cast<const char*>(dsa_get_address(area_, 
session_->userName));
+               // TODO: Check password. See src/backend/libpq/auth.c
+               BackgroundWorkerInitializeConnection(databaseName, userName, 0);
                {
-                       SharedRingBuffer 
buffer(&(sharedData_->executeData.bufferData), area_);
+                       SharedRingBuffer buffer(&(session_->bufferData), area_);
                        // TODO: Customizable.
                        buffer.allocate(1L * 1024L * 1024L);
                }
-               LWLockRelease(lock_);
                StartTransactionCommand();
                SPI_connect();
                pgstat_report_activity(STATE_IDLE, NULL);
+               session_->initialized = true;
+               connected_ = true;
+               P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC, 
sharedData_->serverPID);
+               kill(sharedData_->serverPID, SIGUSR1);
        }
 
        void close()
        {
                pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
closing").c_str());
-               SPI_finish();
-               CommitTransactionCommand();
-               LWLockAcquire(lock_, LW_EXCLUSIVE);
+               if (connected_)
                {
-                       SharedRingBuffer 
buffer(&(sharedData_->executeData.bufferData), area_);
-                       buffer.free();
+                       SPI_finish();
+                       CommitTransactionCommand();
+                       {
+                               SharedRingBuffer 
buffer(&(session_->bufferData), area_);
+                               buffer.free();
+                       }
+                       delete_session(session_);
+               }
+               else
+               {
+                       // TODO: Improve failed to connect case.
+                       session_->status = arrow::Status::Invalid("failed to 
connect");
+                       session_->initialized = true;
+                       P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC, 
sharedData_->serverPID);
+                       kill(sharedData_->serverPID, SIGUSR1);
                }
-               sharedData_->executorPID = InvalidPid;
-               LWLockRelease(lock_);
                pgstat_report_activity(STATE_IDLE, NULL);
        }
 
-       void signaled()
+       SharedRingBuffer create_shared_ring_buffer()
+       {
+               return SharedRingBuffer(&(session_->bufferData), area_);
+       }
+
+       void wait_server_read(SharedRingBuffer* buffer)
        {
-               P("%s: %s: signaled: before: %d", Tag, tag_, 
sharedData_->executeData.query);
-               P("signaled: before: %d", sharedData_->executeData.query);
-               if (DsaPointerIsValid(sharedData_->executeData.query))
+               if (ARROW_PREDICT_FALSE(sharedData_->serverPID == InvalidPid))
                {
-                       execute();
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: server isn't alive", Tag, 
tag_));
                }
-               else
+
+               P("%s: %s: %s: kill server: %d", Tag, tag_, AFS_FUNC, 
sharedData_->serverPID);
+               kill(sharedData_->serverPID, SIGUSR1);
+               auto restSize = buffer->rest_size();
+               while (true)
                {
-                       Processor::signaled();
+                       int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+                       WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
+                       if (GotSIGTERM)
+                       {
+                               break;
+                       }
+                       ResetLatch(MyLatch);
+
+                       if (GotSIGUSR1)
+                       {
+                               GotSIGUSR1 = false;
+                               P("%s: %s: %s: wait: read: %d:%d",
+                                 Tag,
+                                 tag_,
+                                 AFS_FUNC,
+                                 buffer->rest_size(),
+                                 restSize);
+                               if (buffer->rest_size() != restSize)
+                               {
+                                       break;
+                               }
+                       }
+
+                       CHECK_FOR_INTERRUPTS();
                }
-               P("%s: %s: signaled: after: %d", Tag, tag_, 
sharedData_->executeData.query);
        }
 
-   private:
-       void unsetSharedString(dsa_pointer& pointer)
+       void signaled()
        {
-               if (!DsaPointerIsValid(pointer))
+               P("%s: %s: signaled: before: %d", Tag, tag_, session_->query);
+               P("signaled: before: %d", session_->query);
+               if (DsaPointerIsValid(session_->query))
                {
-                       return;
+                       execute();
+               }
+               else
+               {
+                       Processor::signaled();
                }
-               dsa_free(area_, pointer);
-               pointer = InvalidDsaPointer;
+               P("%s: %s: signaled: after: %d", Tag, tag_, session_->query);
        }
 
+   private:
        void execute()
        {
                pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
executing").c_str());
@@ -638,13 +625,13 @@ class Executor : public WorkerProcessor {
                PushActiveSnapshot(GetTransactionSnapshot());
 
                LWLockAcquire(lock_, LW_EXCLUSIVE);
-               auto query = static_cast<const char*>(
-                       dsa_get_address(area_, sharedData_->executeData.query));
+               std::string query(
+                       static_cast<const char*>(dsa_get_address(area_, 
session_->query)));
+               dsa_free(area_, session_->query);
+               session_->query = InvalidDsaPointer;
                SetCurrentStatementStartTimestamp();
-               P("%s: %s: execute: %s", Tag, tag_, query);
-               auto result = SPI_execute(query, true, 0);
-               dsa_free(area_, sharedData_->executeData.query);
-               sharedData_->executeData.query = InvalidDsaPointer;
+               P("%s: %s: execute: %s", Tag, tag_, query.c_str());
+               auto result = SPI_execute(query.c_str(), true, 0);
                LWLockRelease(lock_);
 
                if (result == SPI_OK_SELECT)
@@ -654,17 +641,25 @@ class Executor : public WorkerProcessor {
                        auto status = write();
                        if (!status.ok())
                        {
-                               ereport(ERROR,
-                                       errcode(ERRCODE_INTERNAL_ERROR),
-                                       errmsg("%s: %s: failed to write", Tag, 
tag_));
+                               session_->status = status;
                        }
                }
+               else
+               {
+                       session_->status = arrow::Status::Invalid(Tag,
+                                                                 ": ",
+                                                                 tag_,
+                                                                 ": failed to 
run a query: <",
+                                                                 query,
+                                                                 ">: ",
+                                                                 
SPI_result_code_string(result));
+               }
 
                PopActiveSnapshot();
 
                if (sharedData_->serverPID != InvalidPid)
                {
-                       P("%s: %s: kill server: %s", Tag, tag_, 
sharedData_->serverPID);
+                       P("%s: %s: kill server: %d", Tag, tag_, 
sharedData_->serverPID);
                        kill(sharedData_->serverPID, SIGUSR1);
                }
 
@@ -773,46 +768,164 @@ class Executor : public WorkerProcessor {
                ARROW_RETURN_NOT_OK(writer->Close());
                return output.Close();
        }
+
+       uint64_t sessionID_;
+       SessionData* session_;
+       bool connected_;
 };
 
+arrow::Status
+SharedRingBufferOutputStream::Write(const void* data, int64_t nBytes)
+{
+       if (ARROW_PREDICT_FALSE(!is_open_))
+       {
+               return arrow::Status::IOError(std::string(Tag) + ": " + 
executor_->tag() +
+                                             ": SharedRingBufferOutputStream 
is closed");
+       }
+       if (ARROW_PREDICT_TRUE(nBytes > 0))
+       {
+               auto buffer = std::move(executor_->create_shared_ring_buffer());
+               size_t rest = static_cast<size_t>(nBytes);
+               while (true)
+               {
+                       executor_->lock_acquire(LW_EXCLUSIVE);
+                       auto writtenSize = buffer.write(data, rest);
+                       executor_->lock_release();
+
+                       position_ += writtenSize;
+                       rest -= writtenSize;
+                       data = static_cast<const uint8_t*>(data) + writtenSize;
+
+                       if (ARROW_PREDICT_TRUE(rest == 0))
+                       {
+                               break;
+                       }
+
+                       executor_->wait_server_read(&buffer);
+               }
+       }
+       return arrow::Status::OK();
+}
+
 class Proxy : public WorkerProcessor {
    public:
-       explicit Proxy() : WorkerProcessor("proxy") {}
+       explicit Proxy()
+               : WorkerProcessor("proxy"), randomSeed_(), 
randomEngine_(randomSeed_())
+       {
+       }
 
-       arrow::Status connect(const std::string& databaseName,
-                             const std::string& userName,
-                             const std::string& password)
+       SharedRingBuffer create_shared_ring_buffer(SessionData* session)
        {
-               if (sharedData_->executorPID != InvalidPid)
+               return SharedRingBuffer(&(session->bufferData), area_);
+       }
+
+       void wait_executor_written(SessionData* session, SharedRingBuffer* 
buffer)
+       {
+               if (ARROW_PREDICT_FALSE(session->executorPID == InvalidPid))
                {
-                       return arrow::Status::OK();
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: executor isn't alive", Tag, 
tag_));
                }
-               LWLockAcquire(lock_, LW_EXCLUSIVE);
-               setSharedString(sharedData_->connectData.databaseName, 
databaseName);
-               setSharedString(sharedData_->connectData.userName, userName);
-               setSharedString(sharedData_->connectData.password, password);
-               LWLockRelease(lock_);
+
+               P("%s: %s: %s: kill executor: %d", Tag, tag_, AFS_FUNC, 
session->executorPID);
+               kill(session->executorPID, SIGUSR1);
+               auto size = buffer->size();
+               std::unique_lock<std::mutex> lock(mutex_);
+               conditionVariable_.wait(lock, [&] {
+                       P("%s: %s: %s: wait: write: %d:%d",
+                         Tag,
+                         tag_,
+                         AFS_FUNC,
+                         buffer->size(),
+                         size);
+                       if (INTERRUPTS_PENDING_CONDITION())
+                       {
+                               return true;
+                       }
+                       return buffer->size() != size;
+               });
+       }
+
+       arrow::Result<uint64_t> connect(const std::string& databaseName,
+                                       const std::string& userName,
+                                       const std::string& password)
+       {
+               auto session = create_session(databaseName, userName, password);
+               auto id = session->id;
+               dshash_release_lock(sessions_, session);
                kill(sharedData_->mainPID, SIGUSR1);
                {
                        std::unique_lock<std::mutex> lock(mutex_);
-                       conditionVariable_.wait(
-                               lock, [&] { return sharedData_->executorPID != 
InvalidPid; });
+                       conditionVariable_.wait(lock, [&] {
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               session = 
static_cast<SessionData*>(dshash_find(sessions_, &id, false));
+                               if (!session)
+                               {
+                                       return true;
+                               }
+                               const auto initialized = session->initialized;
+                               dshash_release_lock(sessions_, session);
+                               return initialized;
+                       });
                }
-               return arrow::Status::OK();
+               session = static_cast<SessionData*>(dshash_find(sessions_, &id, 
false));
+               if (!session)
+               {
+                       return arrow::Status::Invalid("session is stale: ", id);
+               }
+               if (!session->status.ok())
+               {
+                       auto status = session->status;
+                       delete_session(session);
+                       return status;
+               }
+               dshash_release_lock(sessions_, session);
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::Invalid("interrupted");
+               }
+               return id;
        }
 
-       arrow::Result<std::shared_ptr<arrow::Schema>> execute(const 
std::string& query)
+       bool is_valid_session(uint64_t sessionID)
        {
-               LWLockAcquire(lock_, LW_EXCLUSIVE);
-               setSharedString(sharedData_->executeData.query, query);
-               LWLockRelease(lock_);
-               if (sharedData_->executorPID != InvalidPid)
+               auto session = find_session(sessionID);
+               if (session)
+               {
+                       dshash_release_lock(sessions_, session);
+                       return true;
+               }
+               else
+               {
+                       return false;
+               }
+       }
+
+       arrow::Result<std::shared_ptr<arrow::Schema>> execute(uint64_t 
sessionID,
+                                                             const 
std::string& query)
+       {
+               auto session = find_session(sessionID);
+               SessionReleaser sessionReleaser(sessions_, session);
+               set_shared_string(session->query, query);
+               if (session->executorPID != InvalidPid)
                {
-                       P("%s: %s: execute: kill executor: %d", Tag, tag_, 
sharedData_->executorPID);
-                       kill(sharedData_->executorPID, SIGUSR1);
+                       P("%s: %s: execute: kill executor: %d", Tag, tag_, 
session->executorPID);
+                       kill(session->executorPID, SIGUSR1);
+               }
+               {
+                       auto buffer = 
std::move(create_shared_ring_buffer(session));
+                       std::unique_lock<std::mutex> lock(mutex_);
+                       conditionVariable_.wait(lock, [&] {
+                               P("%s: %s: %s: wait: execute", Tag, tag_, 
AFS_FUNC);
+                               return !session->status.ok() || buffer.size() > 
0;
+                       });
                }
                P("%s: %s: execute: open", Tag, tag_);
-               auto input = 
std::make_shared<SharedRingBufferInputStream>(this);
+               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, session);
                // Read schema only stream format data.
                ARROW_ASSIGN_OR_RAISE(auto reader,
                                      
arrow::ipc::RecordBatchStreamReader::Open(input));
@@ -830,16 +943,62 @@ class Proxy : public WorkerProcessor {
                return reader->schema();
        }
 
-       arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read()
+       arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read(uint64_t 
sessionID)
        {
-               auto input = 
std::make_shared<SharedRingBufferInputStream>(this);
+               auto session = find_session(sessionID);
+               SessionReleaser sessionReleaser(sessions_, session);
+               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, session);
                // Read another stream format data with record batches.
                return arrow::ipc::RecordBatchStreamReader::Open(input);
        }
 
    private:
-       void setSharedString(dsa_pointer& pointer, const std::string& input)
+       SessionData* create_session(const std::string& databaseName,
+                                   const std::string& userName,
+                                   const std::string& password)
        {
+               LWLockAcquire(lock_, LW_EXCLUSIVE);
+               uint64_t id = 0;
+               SessionData* session = nullptr;
+               do
+               {
+                       id = randomEngine_();
+                       if (id == 0)
+                       {
+                               continue;
+                       }
+                       bool found = false;
+                       session =
+                               
static_cast<SessionData*>(dshash_find_or_insert(sessions_, &id, &found));
+                       if (!found)
+                       {
+                               break;
+                       }
+               } while (true);
+               new (&(session->status)) arrow::Status;
+               session->executorPID = InvalidPid;
+               session->initialized = false;
+               set_shared_string(session->databaseName, databaseName);
+               set_shared_string(session->userName, userName);
+               set_shared_string(session->password, password);
+               session->query = InvalidDsaPointer;
+               SharedRingBuffer::initialize_data(&(session->bufferData));
+               LWLockRelease(lock_);
+               return session;
+       }
+
+       SessionData* find_session(uint64_t sessionID)
+       {
+               return static_cast<SessionData*>(dshash_find(sessions_, 
&sessionID, false));
+       }
+
+       void set_shared_string(dsa_pointer& pointer, const std::string& input)
+       {
+               if (DsaPointerIsValid(pointer))
+               {
+                       dsa_free(area_, pointer);
+                       pointer = InvalidDsaPointer;
+               }
                if (input.empty())
                {
                        return;
@@ -847,16 +1006,53 @@ class Proxy : public WorkerProcessor {
                pointer = dsa_allocate(area_, input.size() + 1);
                memcpy(dsa_get_address(area_, pointer), input.c_str(), 
input.size() + 1);
        }
+
+       std::random_device randomSeed_;
+       std::mt19937_64 randomEngine_;
 };
 
+arrow::Result<int64_t>
+SharedRingBufferInputStream::Read(int64_t nBytes, void* out)
+{
+       if (ARROW_PREDICT_FALSE(!is_open_))
+       {
+               return arrow::Status::IOError(std::string(Tag) + ": " + 
proxy_->tag() +
+                                             ": SharedRingBufferInputStream is 
closed");
+       }
+       auto buffer = std::move(proxy_->create_shared_ring_buffer(session_));
+       size_t rest = static_cast<size_t>(nBytes);
+       while (true)
+       {
+               proxy_->lock_acquire(LW_EXCLUSIVE);
+               auto readBytes = buffer.read(rest, out);
+               proxy_->lock_release();
+
+               position_ += readBytes;
+               rest -= readBytes;
+               out = static_cast<uint8_t*>(out) + readBytes;
+               if (ARROW_PREDICT_TRUE(rest == 0))
+               {
+                       break;
+               }
+
+               proxy_->wait_executor_written(session_, &buffer);
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::IOError(std::string(Tag) + ": " + 
proxy_->tag() +
+                                                     ": interrupted");
+               }
+       }
+       return nBytes;
+}
+
 class MainProcessor : public Processor {
    public:
-       MainProcessor() : Processor("main")
+       MainProcessor() : Processor("main"), sessions_(nullptr)
        {
                LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
                bool found;
-               auto sharedData = static_cast<SharedData*>(
-                       ShmemInitStruct(SharedDataName, sizeof(SharedData), 
&found));
+               sharedData_ = static_cast<SharedData*>(
+                       ShmemInitStruct(SharedDataName, sizeof(sharedData_), 
&found));
                if (found)
                {
                        LWLockRelease(AddinShmemInitLock);
@@ -864,21 +1060,21 @@ class MainProcessor : public Processor {
                                errcode(ERRCODE_INTERNAL_ERROR),
                                errmsg("%s: %s: shared data is already 
created", Tag, tag_));
                }
-               auto area = dsa_create(LWLockNewTrancheId());
-               sharedData->handle = dsa_get_handle(area);
-               sharedData->executorPID = InvalidPid;
-               sharedData->serverPID = InvalidPid;
-               sharedData->mainPID = MyProcPid;
-               sharedData->connectData.databaseName = InvalidDsaPointer;
-               sharedData->connectData.userName = InvalidDsaPointer;
-               sharedData->connectData.password = InvalidDsaPointer;
-               
SharedRingBuffer::initialize_data(&(sharedData->executeData.bufferData));
+               sharedData_->trancheID = LWLockNewTrancheId();
+               sharedData_->sessionsTrancheID = LWLockNewTrancheId();
+               area_ = dsa_create(sharedData_->trancheID);
+               sharedData_->handle = dsa_get_handle(area_);
+               SessionsParams.tranche_id = sharedData_->sessionsTrancheID;
+               sessions_ = dshash_create(area_, &SessionsParams, nullptr);
+               sharedData_->sessionsHandle = 
dshash_get_hash_table_handle(sessions_);
+               sharedData_->serverPID = InvalidPid;
+               sharedData_->mainPID = MyProcPid;
                lock_ = &(GetNamedLWLockTranche(LWLockTrancheName)[0].lock);
                LWLockRelease(AddinShmemInitLock);
-               sharedData_ = sharedData;
-               area_ = area;
        }
 
+       ~MainProcessor() override { dshash_destroy(sessions_); }
+
        BackgroundWorkerHandle* start_server()
        {
                BackgroundWorker worker = {0};
@@ -902,34 +1098,68 @@ class MainProcessor : public Processor {
                return handle;
        }
 
-       void process_connect_request()
+       void process_connect_requests()
        {
-               if (!DsaPointerIsValid(sharedData_->connectData.databaseName))
+               dshash_seq_status sessionsStatus;
+               dshash_seq_init(&sessionsStatus, sessions_, false);
+               SessionData* session;
+               while ((session = 
static_cast<SessionData*>(dshash_seq_next(&sessionsStatus))))
                {
-                       return;
-               }
+                       if (session->initialized)
+                       {
+                               continue;
+                       }
 
-               BackgroundWorker worker = {0};
-               // TODO: Add executor ID to bgw_name
-               snprintf(worker.bgw_name, BGW_MAXLEN, "%s: executor", Tag);
-               snprintf(worker.bgw_type, BGW_MAXLEN, "%s: executor", Tag);
-               worker.bgw_flags = BGWORKER_SHMEM_ACCESS | 
BGWORKER_BACKEND_DATABASE_CONNECTION;
-               worker.bgw_start_time = BgWorkerStart_ConsistentState;
-               worker.bgw_restart_time = BGW_NEVER_RESTART;
-               snprintf(worker.bgw_library_name, BGW_MAXLEN, "%s", 
LibraryName);
-               snprintf(worker.bgw_function_name, BGW_MAXLEN, "afs_executor");
-               worker.bgw_main_arg = 0;
-               worker.bgw_notify_pid = MyProcPid;
-               BackgroundWorkerHandle* handle;
-               if (!RegisterDynamicBackgroundWorker(&worker, &handle))
-               {
-                       ereport(ERROR,
-                               errcode(ERRCODE_INTERNAL_ERROR),
-                               errmsg("%s: %s: failed to start executor", Tag, 
tag_));
+                       BackgroundWorker worker = {0};
+                       snprintf(
+                               worker.bgw_name, BGW_MAXLEN, "%s: executor: %" 
PRIu64, Tag, session->id);
+                       snprintf(
+                               worker.bgw_type, BGW_MAXLEN, "%s: executor: %" 
PRIu64, Tag, session->id);
+                       worker.bgw_flags =
+                               BGWORKER_SHMEM_ACCESS | 
BGWORKER_BACKEND_DATABASE_CONNECTION;
+                       worker.bgw_start_time = BgWorkerStart_ConsistentState;
+                       worker.bgw_restart_time = BGW_NEVER_RESTART;
+                       snprintf(worker.bgw_library_name, BGW_MAXLEN, "%s", 
LibraryName);
+                       snprintf(worker.bgw_function_name, BGW_MAXLEN, 
"afs_executor");
+                       worker.bgw_main_arg = Int64GetDatum(session->id);
+                       worker.bgw_notify_pid = MyProcPid;
+                       BackgroundWorkerHandle* handle;
+                       if (RegisterDynamicBackgroundWorker(&worker, &handle))
+                       {
+                               WaitForBackgroundWorkerStartup(handle, 
&(session->executorPID));
+                       }
+                       else
+                       {
+                               session->status = arrow::Status::UnknownError(
+                                       Tag, ": ", tag_, ": failed to start 
executor: ", session->id);
+                       }
                }
-               WaitForBackgroundWorkerStartup(handle, 
&(sharedData_->executorPID));
+               dshash_seq_term(&sessionsStatus);
                kill(sharedData_->serverPID, SIGUSR1);
        }
+
+   private:
+       dshash_table* sessions_;
+};
+
+class HeaderAuthServerMiddleware : public arrow::flight::ServerMiddleware {
+   public:
+       explicit HeaderAuthServerMiddleware(uint64_t sessionID) : 
sessionID_(sessionID) {}
+
+       void SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers) 
override
+       {
+               outgoing_headers->AddHeader("authorization",
+                                           std::string("Bearer ") + 
std::to_string(sessionID_));
+       }
+
+       void CallCompleted(const arrow::Status& status) override {}
+
+       std::string name() const override { return 
"HeaderAuthServerMiddleware"; }
+
+       uint64_t session_id() { return sessionID_; }
+
+   private:
+       uint64_t sessionID_;
 };
 
 class HeaderAuthServerMiddlewareFactory : public 
arrow::flight::ServerMiddlewareFactory {
@@ -939,9 +1169,10 @@ class HeaderAuthServerMiddlewareFactory : public 
arrow::flight::ServerMiddleware
        {
        }
 
-       arrow::Status StartCall(const arrow::flight::CallInfo& info,
-                               const arrow::flight::CallHeaders& 
incoming_headers,
-                               
std::shared_ptr<arrow::flight::ServerMiddleware>* middleware)
+       arrow::Status StartCall(
+               const arrow::flight::CallInfo& info,
+               const arrow::flight::CallHeaders& incoming_headers,
+               std::shared_ptr<arrow::flight::ServerMiddleware>* middleware) 
override
        {
                std::string databaseName("postgres");
                auto databaseHeader = 
incoming_headers.find("x-flight-sql-database");
@@ -949,25 +1180,69 @@ class HeaderAuthServerMiddlewareFactory : public 
arrow::flight::ServerMiddleware
                {
                        databaseName = databaseHeader->second;
                }
-               std::string userName("");
-               std::string password("");
                auto authorizationHeader = 
incoming_headers.find("authorization");
-               if (authorizationHeader != incoming_headers.end())
+               if (authorizationHeader == incoming_headers.end())
+               {
+                       return arrow::flight::MakeFlightError(
+                               
arrow::flight::FlightStatusCode::Unauthenticated,
+                               "No authorization header");
+               }
+               auto value = authorizationHeader->second;
+               std::stringstream valueStream{std::string(value)};
+               std::string type("");
+               std::getline(valueStream, type, ' ');
+               if (type == "Basic")
                {
                        std::stringstream decodedStream(
-                               
arrow::util::base64_decode(authorizationHeader->second));
+                               
arrow::util::base64_decode(value.substr(valueStream.tellg())));
+                       std::string userName("");
+                       std::string password("");
                        std::getline(decodedStream, userName, ':');
                        std::getline(decodedStream, password);
+                       auto sessionIDResult = proxy_->connect(databaseName, 
userName, password);
+                       if (!sessionIDResult.status().ok())
+                       {
+                               return arrow::flight::MakeFlightError(
+                                       
arrow::flight::FlightStatusCode::Unauthenticated,
+                                       sessionIDResult.status().ToString());
+                       }
+                       auto sessionID = *sessionIDResult;
+                       *middleware = 
std::make_shared<HeaderAuthServerMiddleware>(sessionID);
+                       return arrow::Status::OK();
                }
-               auto status = proxy_->connect(databaseName, userName, password);
-               if (status.ok())
+               else if (type == "Bearer")
                {
-                       return status;
+                       std::string 
sessionIDString(value.substr(valueStream.tellg()));
+                       if (sessionIDString.size() == 0)
+                       {
+                               return arrow::flight::MakeFlightError(
+                                       
arrow::flight::FlightStatusCode::Unauthorized,
+                                       std::string("invalid Bearer token"));
+                       }
+                       auto start = sessionIDString.c_str();
+                       char* end = nullptr;
+                       uint64_t sessionID = std::strtoull(start, &end, 10);
+                       if (end[0] != '\0')
+                       {
+                               return arrow::flight::MakeFlightError(
+                                       
arrow::flight::FlightStatusCode::Unauthorized,
+                                       std::string("invalid Bearer token"));
+                       }
+                       if (!proxy_->is_valid_session(sessionID))
+                       {
+                               return arrow::flight::MakeFlightError(
+                                       
arrow::flight::FlightStatusCode::Unauthorized,
+                                       std::string("invalid Bearer token"));
+                       }
+                       *middleware = 
std::make_shared<HeaderAuthServerMiddleware>(sessionID);
+                       return arrow::Status::OK();
                }
                else
                {
                        return arrow::flight::MakeFlightError(
-                               
arrow::flight::FlightStatusCode::Unauthenticated, status.ToString());
+                               
arrow::flight::FlightStatusCode::Unauthenticated,
+                               std::string("authorization header must use 
Basic or Bearer: <") + type +
+                                       std::string(">"));
                }
        }
 
@@ -989,8 +1264,9 @@ class FlightSQLServer : public 
arrow::flight::sql::FlightSqlServerBase {
                const arrow::flight::sql::StatementQuery& command,
                const arrow::flight::FlightDescriptor& descriptor)
        {
+               ARROW_ASSIGN_OR_RAISE(auto sessionID, session_id(context));
                const auto& query = command.query;
-               ARROW_ASSIGN_OR_RAISE(auto schema, proxy_->execute(query));
+               ARROW_ASSIGN_OR_RAISE(auto schema, proxy_->execute(sessionID, 
query));
                ARROW_ASSIGN_OR_RAISE(auto ticket,
                                      
arrow::flight::sql::CreateStatementQueryTicket(query));
                std::vector<arrow::flight::FlightEndpoint> endpoints{
@@ -1005,11 +1281,24 @@ class FlightSQLServer : public 
arrow::flight::sql::FlightSqlServerBase {
                const arrow::flight::ServerCallContext& context,
                const arrow::flight::sql::StatementQueryTicket& command)
        {
-               ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->read());
+               ARROW_ASSIGN_OR_RAISE(auto sessionID, session_id(context));
+               ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->read(sessionID));
                return 
std::make_unique<arrow::flight::RecordBatchStream>(reader);
        }
 
    private:
+       arrow::Result<uint64_t> session_id(const 
arrow::flight::ServerCallContext& context)
+       {
+               auto middleware = reinterpret_cast<HeaderAuthServerMiddleware*>(
+                       context.GetMiddleware("header-auth"));
+               if (!middleware)
+               {
+                       return arrow::flight::MakeFlightError(
+                               
arrow::flight::FlightStatusCode::Unauthenticated, "no authorization");
+               }
+               return middleware->session_id();
+       }
+
        Proxy* proxy_;
 };
 
@@ -1018,6 +1307,7 @@ afs_server_internal(Proxy* proxy)
 {
        ARROW_ASSIGN_OR_RAISE(auto location, 
arrow::flight::Location::Parse(URI));
        arrow::flight::FlightServerOptions options(location);
+       options.auth_handler = 
std::make_unique<arrow::flight::NoOpAuthHandler>();
        options.middleware.push_back(
                {"header-auth", 
std::make_shared<HeaderAuthServerMiddlewareFactory>(proxy)});
        FlightSQLServer flightSQLServer(proxy);
@@ -1050,6 +1340,16 @@ afs_server_internal(Proxy* proxy)
 
 }  // namespace
 
+static void
+afs_executor_before_shmem_exit(int code, Datum arg)
+{
+       // TODO: This doesn't work. We need to improve
+       // BackgroundWorkerInitializeConnection() failed case.
+       auto executor = reinterpret_cast<Executor*>(DatumGetPointer(arg));
+       executor->close();
+       delete executor;
+}
+
 extern "C" void
 afs_executor(Datum arg)
 {
@@ -1058,42 +1358,40 @@ afs_executor(Datum arg)
        pqsignal(SIGUSR1, afs_sigusr1);
        BackgroundWorkerUnblockSignals();
 
+       auto executor = new Executor(DatumGetInt64(arg));
+       before_shmem_exit(afs_executor_before_shmem_exit, 
PointerGetDatum(executor));
+       executor->open();
+
+       while (!GotSIGTERM)
        {
-               Executor executor;
-               executor.open();
-               while (!GotSIGTERM)
+               int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+               const long timeout = SessionTimeout * 1000;
+               if (timeout >= 0)
                {
-                       int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
-                       const long timeout = SessionTimeout * 1000;
-                       if (timeout >= 0)
-                       {
-                               events |= WL_TIMEOUT;
-                       }
-                       auto conditions = WaitLatch(MyLatch, events, timeout, 
PG_WAIT_EXTENSION);
-
-                       if (conditions & WL_TIMEOUT)
-                       {
-                               break;
-                       }
+                       events |= WL_TIMEOUT;
+               }
+               auto conditions = WaitLatch(MyLatch, events, timeout, 
PG_WAIT_EXTENSION);
 
-                       ResetLatch(MyLatch);
+               if (conditions & WL_TIMEOUT)
+               {
+                       break;
+               }
 
-                       if (GotSIGHUP)
-                       {
-                               GotSIGHUP = false;
-                               ProcessConfigFile(PGC_SIGHUP);
-                       }
+               ResetLatch(MyLatch);
 
-                       if (GotSIGUSR1)
-                       {
-                               GotSIGUSR1 = false;
-                               executor.signaled();
-                       }
+               if (GotSIGHUP)
+               {
+                       GotSIGHUP = false;
+                       ProcessConfigFile(PGC_SIGHUP);
+               }
 
-                       CHECK_FOR_INTERRUPTS();
+               if (GotSIGUSR1)
+               {
+                       GotSIGUSR1 = false;
+                       executor->signaled();
                }
-               // TODO: Use before_shmem_exit()
-               executor.close();
+
+               CHECK_FOR_INTERRUPTS();
        }
 
        proc_exit(0);
@@ -1149,7 +1447,7 @@ afs_main(Datum arg)
                        if (GotSIGUSR1)
                        {
                                GotSIGUSR1 = false;
-                               processor.process_connect_request();
+                               processor.process_connect_requests();
                        }
 
                        CHECK_FOR_INTERRUPTS();
diff --git a/test/test-flight-sql.rb b/test/test-flight-sql.rb
index 2b26e09..71a2174 100644
--- a/test/test-flight-sql.rb
+++ b/test/test-flight-sql.rb
@@ -18,14 +18,23 @@
 class FlightSQLTest < Test::Unit::TestCase
   include Helper::Sandbox
 
+  setup do
+    unless flight_client.respond_to?(:authenticate_basic)
+      omit("ArrowFlight::Client#authenticate_basic is needed" )
+    end
+    @options = ArrowFlight::CallOptions.new
+    @options.add_header("x-flight-sql-database", @test_db_name)
+    user = @postgresql.user
+    password = ""
+    flight_client.authenticate_basic(user, password, @options)
+  end
+
   def test_select_int32
-    options = ArrowFlight::CallOptions.new
-    options.add_header("x-flight-sql-database", @test_db_name)
-    info = flight_sql_client.execute("SELECT 1 AS value", options)
+    info = flight_sql_client.execute("SELECT 1 AS value", @options)
     assert_equal(Arrow::Schema.new(value: :int32),
                  info.get_schema)
     endpoint = info.endpoints.first
-    reader = flight_sql_client.do_get(endpoint.ticket, options)
+    reader = flight_sql_client.do_get(endpoint.ticket, @options)
     assert_equal(Arrow::Table.new(value: Arrow::Int32Array.new([1])),
                  reader.read_all)
   end
@@ -34,13 +43,11 @@ class FlightSQLTest < Test::Unit::TestCase
     run_sql("CREATE TABLE data (value integer)")
     run_sql("INSERT INTO data VALUES (1), (-2), (3)")
 
-    options = ArrowFlight::CallOptions.new
-    options.add_header("x-flight-sql-database", @test_db_name)
-    info = flight_sql_client.execute("SELECT * FROM data", options)
+    info = flight_sql_client.execute("SELECT * FROM data", @options)
     assert_equal(Arrow::Schema.new(value: :int32),
                  info.get_schema)
     endpoint = info.endpoints.first
-    reader = flight_sql_client.do_get(endpoint.ticket, options)
+    reader = flight_sql_client.do_get(endpoint.ticket, @options)
     assert_equal(Arrow::Table.new(value: Arrow::Int32Array.new([1, -2, 3])),
                  reader.read_all)
   end


Reply via email to