This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git
The following commit(s) were added to refs/heads/main by this push:
new 7e76479 Add support for Timestamp (#75)
7e76479 is described below
commit 7e7647936c2de4cbee80a9a7df7926f9c8cae592
Author: Sutou Kouhei <[email protected]>
AuthorDate: Wed Aug 23 17:20:06 2023 +0900
Add support for Timestamp (#75)
Closes GH-57
---
src/afs.cc | 43 ++++++++++++++++++++++
test/helper/sandbox.rb | 1 +
test/test-flight-sql.rb | 97 +++++++++++++++++++++++++++++++++++--------------
3 files changed, 113 insertions(+), 28 deletions(-)
diff --git a/src/afs.cc b/src/afs.cc
index 3727a59..f977f52 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -40,6 +40,7 @@ extern "C"
#include <utils/guc.h>
#include <utils/memutils.h>
#include <utils/snapmgr.h>
+#include <utils/timestamp.h>
#include <utils/wait_event.h>
}
@@ -804,6 +805,12 @@ class ArrowPGTypeConverter : public arrow::TypeVisitor {
return arrow::Status::OK();
}
+ arrow::Status Visit(const arrow::TimestampType& type)
+ {
+ oid_ = TIMESTAMPOID;
+ return arrow::Status::OK();
+ }
+
private:
Oid oid_;
};
@@ -889,6 +896,34 @@ class ArrowPGValueConverter : public arrow::ArrayVisitor {
return arrow::Status::OK();
}
+ arrow::Status Visit(const arrow::TimestampArray& array)
+ {
+ const auto unit =
+
std::static_pointer_cast<arrow::TimestampType>(array.type())->unit();
+ Timestamp value = 0;
+ switch (unit)
+ {
+ case arrow::TimeUnit::SECOND:
+ value += array.Value(i_row_) * 1000000;
+ break;
+ case arrow::TimeUnit::MILLI:
+ value += array.Value(i_row_) * 1000;
+ break;
+ case arrow::TimeUnit::MICRO:
+ value += array.Value(i_row_);
+ break;
+ case arrow::TimeUnit::NANO:
+ value += array.Value(i_row_) / 1000;
+ break;
+ default:
+ return
arrow::Status::NotImplemented("Unsupported time unit: ", unit);
+ }
+ // Arrow uses UNIX epoch (1970-01-01) but PostgreSQL uses
2000-01-01.
+ value -= (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) *
USECS_PER_DAY;
+ datum_ = TimestampGetDatum(value);
+ return arrow::Status::OK();
+ }
+
private:
int64_t i_row_;
Datum& datum_;
@@ -917,6 +952,8 @@ class PGArrowValueConverter : public arrow::ArrayVisitor {
return arrow::utf8();
case BYTEAOID:
return arrow::binary();
+ case TIMESTAMPOID:
+ return arrow::timestamp(arrow::TimeUnit::MICRO);
default:
return
arrow::Status::NotImplemented("Unsupported PostgreSQL type: ",
attribute_->atttypid);
@@ -949,6 +986,12 @@ class PGArrowValueConverter : public arrow::ArrayVisitor {
case BYTEAOID:
return
static_cast<arrow::BinaryBuilder*>(builder)->Append(
VARDATA_ANY(datum),
VARSIZE_ANY_EXHDR(datum));
+ case TIMESTAMPOID:
+ // Arrow uses UNIX epoch (1970-01-01) but
PostgreSQL
+ // uses 2000-01-01.
+ return
static_cast<arrow::TimestampBuilder*>(builder)->Append(
+ DatumGetTimestamp(datum) +
+ (POSTGRES_EPOCH_JDATE -
UNIX_EPOCH_JDATE) * USECS_PER_DAY);
default:
return
arrow::Status::NotImplemented("Unsupported PostgreSQL type: ",
attribute_->atttypid);
diff --git a/test/helper/sandbox.rb b/test/helper/sandbox.rb
index 995a147..c11b2da 100644
--- a/test/helper/sandbox.rb
+++ b/test/helper/sandbox.rb
@@ -18,6 +18,7 @@
require "fileutils"
require "socket"
require "tempfile"
+require "time"
require "arrow-flight-sql"
diff --git a/test/test-flight-sql.rb b/test/test-flight-sql.rb
index 1a50d63..4465d56 100644
--- a/test/test-flight-sql.rb
+++ b/test/test-flight-sql.rb
@@ -31,6 +31,8 @@ class FlightSQLTest < Test::Unit::TestCase
def to_sql(value)
case value
+ when Time
+ "'#{value.dup.utc.strftime("%Y-%m-%d %H:%M:%S.%6N")}'"
when String
sql_string = "'"
value.each_char do |char|
@@ -54,17 +56,21 @@ class FlightSQLTest < Test::Unit::TestCase
end
end
- data("int16", ["smallint", Arrow::Int16Array, -2])
- data("int32", ["integer", Arrow::Int32Array, -2])
- data("int64", ["bigint", Arrow::Int64Array, -2])
- data("float", ["real", Arrow::FloatArray, -2.2])
- data("double", ["double precision", Arrow::DoubleArray, -2.2])
- data("string - text", ["text", Arrow::StringArray, "b"])
- data("string - varchar", ["varchar(10)", Arrow::StringArray, "b"])
- data("binary", ["bytea", Arrow::BinaryArray, "\x0".b])
+ timestamp_value = Time.parse("2023-08-02T18:03:13.526572321Z").utc
+
+ data("int16", ["smallint", :int16, -2])
+ data("int32", ["integer", :int32, -2])
+ data("int64", ["bigint", :int64, -2])
+ data("float", ["real", :float, -2.2])
+ data("double", ["double precision", :double, -2.2])
+ data("string - text", ["text", :string, "b"])
+ data("string - varchar", ["varchar(10)", :string, "b"])
+ data("binary", ["bytea", :binary, "\x0".b])
+ data("timestamp", ["timestamp", [:timestamp, :micro], timestamp_value])
def test_select_type
- pg_type, array_class, value = data
- values = array_class.new([value])
+ pg_type, data_type, value = data
+ data_type = Arrow::DataType.resolve(data_type)
+ values = data_type.build_array([value])
sql = "SELECT #{to_sql(value)}::#{pg_type} AS value"
info = flight_sql_client.execute(sql, @options)
assert_equal(Arrow::Schema.new(value: values.value_data_type),
@@ -111,39 +117,72 @@ SELECT * FROM data
RESULT
end
- data("int8", ["smallint", Arrow::Int8Array, [1, -2, 3]])
- data("int16", ["smallint", Arrow::Int16Array, [1, -2, 3]])
- data("int32", ["integer", Arrow::Int32Array, [1, -2, 3]])
- data("int64", ["bigint", Arrow::Int64Array, [1, -2, 3]])
- data("uint8", ["smallint", Arrow::UInt8Array, [1, 2, 3]])
- data("uint16", ["smallint", Arrow::UInt16Array, [1, 2, 3]])
- data("uint32", ["integer", Arrow::UInt32Array, [1, 2, 3]])
- data("uint64", ["bigint", Arrow::UInt64Array, [1, 2, 3]])
- data("float", ["real", Arrow::FloatArray, [1.1, -2.2, 3.3]])
- data("double", ["double precision", Arrow::DoubleArray, [1.1, -2.2, 3.3]])
- data("string - text", ["text", Arrow::StringArray, ["a", "b",
"c"]])
- data("string - varchar", ["varchar(10)", Arrow::StringArray, ["a", "b",
"c"]])
- data("binary", ["bytea", Arrow::BinaryArray, ["\x0".b, "\x1".b, "\x2".b]])
+ timestamp_values = [
+ Time.parse("2023-08-02T18:03:13.526572321Z").utc,
+ Time.parse("2023-08-02T18:04:13.526572321Z").utc,
+ Time.parse("2023-08-02T18:05:13.526572321Z").utc,
+ ]
+
+ data("int8", ["smallint", :int8, [1, -2, 3]])
+ data("int16", ["smallint", :int16, [1, -2, 3]])
+ data("int32", ["integer", :int32, [1, -2, 3]])
+ data("int64", ["bigint", :int64, [1, -2, 3]])
+ data("uint8", ["smallint", :uint8, [1, 2, 3]])
+ data("uint16", ["smallint", :uint16, [1, 2, 3]])
+ data("uint32", ["integer", :uint32, [1, 2, 3]])
+ data("uint64", ["bigint", :uint64, [1, 2, 3]])
+ data("float", ["real", :float, [1.1, -2.2, 3.3]])
+ data("double", ["double precision", :double, [1.1, -2.2, 3.3]])
+ data("string - text", ["text", :string, ["a", "b", "c"]])
+ data("string - varchar", ["varchar(10)", :string, ["a", "b", "c"]])
+ data("binary", ["bytea", :binary, ["\x0".b, "\x1".b, "\x2".b]])
+ data("timestamp(second)",
+ ["timestamp", [:timestamp, :second], timestamp_values])
+ data("timestamp(milli)",
+ ["timestamp", [:timestamp, :milli], timestamp_values])
+ data("timestamp(micro)",
+ ["timestamp", [:timestamp, :micro], timestamp_values])
+ data("timestamp(nano)",
+ ["timestamp", [:timestamp, :nano], timestamp_values])
def test_insert_type
unless flight_sql_client.respond_to?(:prepare)
omit("red-arrow-flight-sql 14.0.0 or later is required")
end
- pg_type, array_class, values = data
+ pg_type, data_type, values = data
+ data_type = Arrow::DataType.resolve(data_type)
run_sql("CREATE TABLE data (value #{pg_type})")
+ array = data_type.build_array(values)
flight_sql_client.prepare("INSERT INTO data VALUES ($1)",
@options) do |statement|
- array = array_class.new(values)
statement.record_batch = Arrow::RecordBatch.new(value: array)
n_changed_records = statement.execute_update(@options)
- assert_equal(3, n_changed_records)
+ assert_equal(values.size, n_changed_records)
end
+ case values.first
+ when Time
+ case data_type.unit
+ when Arrow::TimeUnit::SECOND
+ value_size = "1970-01-01 00:00:00".size
+ strftime_format = "%Y-%m-%d %H:%M:%S"
+ when Arrow::TimeUnit::MILLI
+ value_size = "1970-01-01 00:00:00.000".size
+ strftime_format = "%Y-%m-%d %H:%M:%S.%3N"
+ when Arrow::TimeUnit::MICRO, Arrow::TimeUnit::NANO
+ value_size = "1970-01-01 00:00:00.000000".size
+ strftime_format = "%Y-%m-%d %H:%M:%S.%6N"
+ else
+ raise "unsupported: #{data_type.unit.inspect}"
+ end
+ else
+ value_size = 5
+ end
output = <<-RESULT
SELECT * FROM data
- value
--------
+ #{"value".center(value_size)}
+-#{"-" * value_size}-
RESULT
values.each do |value|
case value
@@ -151,6 +190,8 @@ SELECT * FROM data
output << (" %5.1f\n" % value)
when Integer
output << (" %5d\n" % value)
+ when Time
+ output << " #{value.strftime(strftime_format)}\n"
else
if value.encoding == "".b.encoding
output << " "