This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e76479  Add support for Timestamp (#75)
7e76479 is described below

commit 7e7647936c2de4cbee80a9a7df7926f9c8cae592
Author: Sutou Kouhei <[email protected]>
AuthorDate: Wed Aug 23 17:20:06 2023 +0900

    Add support for Timestamp (#75)
    
    Closes GH-57
---
 src/afs.cc              | 43 ++++++++++++++++++++++
 test/helper/sandbox.rb  |  1 +
 test/test-flight-sql.rb | 97 +++++++++++++++++++++++++++++++++++--------------
 3 files changed, 113 insertions(+), 28 deletions(-)

diff --git a/src/afs.cc b/src/afs.cc
index 3727a59..f977f52 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -40,6 +40,7 @@ extern "C"
 #include <utils/guc.h>
 #include <utils/memutils.h>
 #include <utils/snapmgr.h>
+#include <utils/timestamp.h>
 #include <utils/wait_event.h>
 }
 
@@ -804,6 +805,12 @@ class ArrowPGTypeConverter : public arrow::TypeVisitor {
                return arrow::Status::OK();
        }
 
+       arrow::Status Visit(const arrow::TimestampType& type)
+       {
+               oid_ = TIMESTAMPOID;
+               return arrow::Status::OK();
+       }
+
    private:
        Oid oid_;
 };
@@ -889,6 +896,34 @@ class ArrowPGValueConverter : public arrow::ArrayVisitor {
                return arrow::Status::OK();
        }
 
+       arrow::Status Visit(const arrow::TimestampArray& array)
+       {
+               const auto unit =
+                       
std::static_pointer_cast<arrow::TimestampType>(array.type())->unit();
+               Timestamp value = 0;
+               switch (unit)
+               {
+                       case arrow::TimeUnit::SECOND:
+                               value += array.Value(i_row_) * 1000000;
+                               break;
+                       case arrow::TimeUnit::MILLI:
+                               value += array.Value(i_row_) * 1000;
+                               break;
+                       case arrow::TimeUnit::MICRO:
+                               value += array.Value(i_row_);
+                               break;
+                       case arrow::TimeUnit::NANO:
+                               value += array.Value(i_row_) / 1000;
+                               break;
+                       default:
+                               return 
arrow::Status::NotImplemented("Unsupported time unit: ", unit);
+               }
+               // Arrow uses UNIX epoch (1970-01-01) but PostgreSQL uses 
2000-01-01.
+               value -= (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 
USECS_PER_DAY;
+               datum_ = TimestampGetDatum(value);
+               return arrow::Status::OK();
+       }
+
    private:
        int64_t i_row_;
        Datum& datum_;
@@ -917,6 +952,8 @@ class PGArrowValueConverter : public arrow::ArrayVisitor {
                                return arrow::utf8();
                        case BYTEAOID:
                                return arrow::binary();
+                       case TIMESTAMPOID:
+                               return arrow::timestamp(arrow::TimeUnit::MICRO);
                        default:
                                return 
arrow::Status::NotImplemented("Unsupported PostgreSQL type: ",
                                                                     
attribute_->atttypid);
@@ -949,6 +986,12 @@ class PGArrowValueConverter : public arrow::ArrayVisitor {
                        case BYTEAOID:
                                return 
static_cast<arrow::BinaryBuilder*>(builder)->Append(
                                        VARDATA_ANY(datum), 
VARSIZE_ANY_EXHDR(datum));
+                       case TIMESTAMPOID:
+                               // Arrow uses UNIX epoch (1970-01-01) but 
PostgreSQL
+                               // uses 2000-01-01.
+                               return 
static_cast<arrow::TimestampBuilder*>(builder)->Append(
+                                       DatumGetTimestamp(datum) +
+                                       (POSTGRES_EPOCH_JDATE - 
UNIX_EPOCH_JDATE) * USECS_PER_DAY);
                        default:
                                return 
arrow::Status::NotImplemented("Unsupported PostgreSQL type: ",
                                                                     
attribute_->atttypid);
diff --git a/test/helper/sandbox.rb b/test/helper/sandbox.rb
index 995a147..c11b2da 100644
--- a/test/helper/sandbox.rb
+++ b/test/helper/sandbox.rb
@@ -18,6 +18,7 @@
 require "fileutils"
 require "socket"
 require "tempfile"
+require "time"
 
 require "arrow-flight-sql"
 
diff --git a/test/test-flight-sql.rb b/test/test-flight-sql.rb
index 1a50d63..4465d56 100644
--- a/test/test-flight-sql.rb
+++ b/test/test-flight-sql.rb
@@ -31,6 +31,8 @@ class FlightSQLTest < Test::Unit::TestCase
 
   def to_sql(value)
     case value
+    when Time
+      "'#{value.dup.utc.strftime("%Y-%m-%d %H:%M:%S.%6N")}'"
     when String
       sql_string = "'"
       value.each_char do |char|
@@ -54,17 +56,21 @@ class FlightSQLTest < Test::Unit::TestCase
     end
   end
 
-  data("int16",  ["smallint",         Arrow::Int16Array, -2])
-  data("int32",  ["integer",          Arrow::Int32Array, -2])
-  data("int64",  ["bigint",           Arrow::Int64Array, -2])
-  data("float",  ["real",             Arrow::FloatArray, -2.2])
-  data("double", ["double precision", Arrow::DoubleArray, -2.2])
-  data("string - text",    ["text",        Arrow::StringArray, "b"])
-  data("string - varchar", ["varchar(10)", Arrow::StringArray, "b"])
-  data("binary", ["bytea", Arrow::BinaryArray, "\x0".b])
+  timestamp_value = Time.parse("2023-08-02T18:03:13.526572321Z").utc
+
+  data("int16",  ["smallint",         :int16, -2])
+  data("int32",  ["integer",          :int32, -2])
+  data("int64",  ["bigint",           :int64, -2])
+  data("float",  ["real",             :float, -2.2])
+  data("double", ["double precision", :double, -2.2])
+  data("string - text",    ["text",        :string, "b"])
+  data("string - varchar", ["varchar(10)", :string, "b"])
+  data("binary", ["bytea", :binary, "\x0".b])
+  data("timestamp", ["timestamp", [:timestamp, :micro], timestamp_value])
   def test_select_type
-    pg_type, array_class, value = data
-    values = array_class.new([value])
+    pg_type, data_type, value = data
+    data_type = Arrow::DataType.resolve(data_type)
+    values = data_type.build_array([value])
     sql = "SELECT #{to_sql(value)}::#{pg_type} AS value"
     info = flight_sql_client.execute(sql, @options)
     assert_equal(Arrow::Schema.new(value: values.value_data_type),
@@ -111,39 +117,72 @@ SELECT * FROM data
     RESULT
   end
 
-  data("int8",   ["smallint", Arrow::Int8Array,   [1, -2, 3]])
-  data("int16",  ["smallint", Arrow::Int16Array,  [1, -2, 3]])
-  data("int32",  ["integer",  Arrow::Int32Array,  [1, -2, 3]])
-  data("int64",  ["bigint",   Arrow::Int64Array,  [1, -2, 3]])
-  data("uint8",  ["smallint", Arrow::UInt8Array,  [1,  2, 3]])
-  data("uint16", ["smallint", Arrow::UInt16Array, [1,  2, 3]])
-  data("uint32", ["integer",  Arrow::UInt32Array, [1,  2, 3]])
-  data("uint64", ["bigint",   Arrow::UInt64Array, [1,  2, 3]])
-  data("float",  ["real",             Arrow::FloatArray,  [1.1, -2.2, 3.3]])
-  data("double", ["double precision", Arrow::DoubleArray, [1.1, -2.2, 3.3]])
-  data("string - text",    ["text",        Arrow::StringArray, ["a", "b", 
"c"]])
-  data("string - varchar", ["varchar(10)", Arrow::StringArray, ["a", "b", 
"c"]])
-  data("binary", ["bytea", Arrow::BinaryArray, ["\x0".b, "\x1".b, "\x2".b]])
+  timestamp_values = [
+    Time.parse("2023-08-02T18:03:13.526572321Z").utc,
+    Time.parse("2023-08-02T18:04:13.526572321Z").utc,
+    Time.parse("2023-08-02T18:05:13.526572321Z").utc,
+  ]
+
+  data("int8",   ["smallint", :int8,   [1, -2, 3]])
+  data("int16",  ["smallint", :int16,  [1, -2, 3]])
+  data("int32",  ["integer",  :int32,  [1, -2, 3]])
+  data("int64",  ["bigint",   :int64,  [1, -2, 3]])
+  data("uint8",  ["smallint", :uint8,  [1,  2, 3]])
+  data("uint16", ["smallint", :uint16, [1,  2, 3]])
+  data("uint32", ["integer",  :uint32, [1,  2, 3]])
+  data("uint64", ["bigint",   :uint64, [1,  2, 3]])
+  data("float",  ["real",             :float,  [1.1, -2.2, 3.3]])
+  data("double", ["double precision", :double, [1.1, -2.2, 3.3]])
+  data("string - text",    ["text",        :string, ["a", "b", "c"]])
+  data("string - varchar", ["varchar(10)", :string, ["a", "b", "c"]])
+  data("binary", ["bytea", :binary, ["\x0".b, "\x1".b, "\x2".b]])
+  data("timestamp(second)",
+       ["timestamp", [:timestamp, :second], timestamp_values])
+  data("timestamp(milli)",
+       ["timestamp", [:timestamp, :milli], timestamp_values])
+  data("timestamp(micro)",
+       ["timestamp", [:timestamp, :micro], timestamp_values])
+  data("timestamp(nano)",
+       ["timestamp", [:timestamp, :nano], timestamp_values])
   def test_insert_type
     unless flight_sql_client.respond_to?(:prepare)
       omit("red-arrow-flight-sql 14.0.0 or later is required")
     end
 
-    pg_type, array_class, values = data
+    pg_type, data_type, values = data
+    data_type = Arrow::DataType.resolve(data_type)
     run_sql("CREATE TABLE data (value #{pg_type})")
 
+    array = data_type.build_array(values)
     flight_sql_client.prepare("INSERT INTO data VALUES ($1)",
                               @options) do |statement|
-      array = array_class.new(values)
       statement.record_batch = Arrow::RecordBatch.new(value: array)
       n_changed_records = statement.execute_update(@options)
-      assert_equal(3, n_changed_records)
+      assert_equal(values.size, n_changed_records)
     end
 
+    case values.first
+    when Time
+      case data_type.unit
+      when Arrow::TimeUnit::SECOND
+        value_size = "1970-01-01 00:00:00".size
+        strftime_format = "%Y-%m-%d %H:%M:%S"
+      when Arrow::TimeUnit::MILLI
+        value_size = "1970-01-01 00:00:00.000".size
+        strftime_format = "%Y-%m-%d %H:%M:%S.%3N"
+      when Arrow::TimeUnit::MICRO, Arrow::TimeUnit::NANO
+        value_size = "1970-01-01 00:00:00.000000".size
+        strftime_format = "%Y-%m-%d %H:%M:%S.%6N"
+      else
+        raise "unsupported: #{data_type.unit.inspect}"
+      end
+    else
+      value_size = 5
+    end
     output = <<-RESULT
 SELECT * FROM data
- value 
--------
+ #{"value".center(value_size)} 
+-#{"-" * value_size}-
     RESULT
     values.each do |value|
       case value
@@ -151,6 +190,8 @@ SELECT * FROM data
         output << (" %5.1f\n" % value)
       when Integer
         output << (" %5d\n" % value)
+      when Time
+        output << " #{value.strftime(strftime_format)}\n"
       else
         if value.encoding == "".b.encoding
           output << " "

Reply via email to