This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 963b57bb feat(c/driver/postgresql): TIMESTAMP COPY Writer (#1185)
963b57bb is described below

commit 963b57bbf254f30aec090104d06e99defb60240d
Author: William Ayd <[email protected]>
AuthorDate: Wed Oct 11 14:16:59 2023 -0400

    feat(c/driver/postgresql): TIMESTAMP COPY Writer (#1185)
---
 c/driver/postgresql/postgres_copy_reader.h       |  80 +++++++++++++--
 c/driver/postgresql/postgres_copy_reader_test.cc | 122 +++++++++++++++++++++++
 2 files changed, 196 insertions(+), 6 deletions(-)

diff --git a/c/driver/postgresql/postgres_copy_reader.h 
b/c/driver/postgresql/postgres_copy_reader.h
index 21b08d31..3a26442c 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -28,6 +28,7 @@
 
 #include <nanoarrow/nanoarrow.hpp>
 
+#include "common/utils.h"
 #include "postgres_type.h"
 #include "postgres_util.h"
 
@@ -1211,10 +1212,61 @@ class PostgresCopyBinaryFieldWriter : public 
PostgresCopyFieldWriter {
   }
 };
 
-static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType 
arrow_type,
-                                                 PostgresCopyFieldWriter** out,
-                                                 ArrowError* error) {
-  switch (arrow_type) {
+template <enum ArrowTimeUnit TU>
+class PostgresCopyTimestampFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    constexpr int32_t field_size_bytes = sizeof(int64_t);
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+
+    int64_t raw_value = ArrowArrayViewGetIntUnsafe(array_view_, index);
+    int64_t value;
+
+    bool overflow_safe = true;
+    switch (TU) {
+      case NANOARROW_TIME_UNIT_SECOND:
+        if ((overflow_safe = raw_value <= kMaxSafeSecondsToMicros &&
+             raw_value >= kMinSafeSecondsToMicros)) {
+          value = raw_value * 1000000;
+        }
+        break;
+      case NANOARROW_TIME_UNIT_MILLI:
+        if ((overflow_safe = raw_value <= kMaxSafeMillisToMicros &&
+             raw_value >= kMinSafeMillisToMicros)) {
+          value = raw_value * 1000;
+        }
+        break;
+      case NANOARROW_TIME_UNIT_MICRO:
+        value = raw_value;
+        break;
+      case NANOARROW_TIME_UNIT_NANO:
+        value = raw_value / 1000;
+        break;
+    }
+
+    if (!overflow_safe) {
+      ArrowErrorSet(
+          error,
+          "Row %" PRId64 " timestamp value %" PRId64 " with unit %d would 
overflow",
+          index,
+          raw_value,
+          TU);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    // 2000-01-01 00:00:00.000000 in microseconds
+    constexpr int64_t kPostgresTimestampEpoch = 946684800000000;
+    const int64_t scaled = value - kPostgresTimestampEpoch;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, scaled, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(
+    const struct ArrowSchemaView& schema_view, PostgresCopyFieldWriter** out,
+    ArrowError* error) {
+  switch (schema_view.type) {
     case NANOARROW_TYPE_BOOL:
       *out = new PostgresCopyBooleanFieldWriter();
       return NANOARROW_OK;
@@ -1243,6 +1295,23 @@ static inline ArrowErrorCode MakeCopyFieldWriter(const 
enum ArrowType arrow_type
     case NANOARROW_TYPE_LARGE_STRING:
       *out = new PostgresCopyBinaryFieldWriter();
       return NANOARROW_OK;
+    case NANOARROW_TYPE_TIMESTAMP: {
+        switch (schema_view.time_unit) {
+          case NANOARROW_TIME_UNIT_NANO:
+            *out = new 
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_NANO>();
+            break;
+          case NANOARROW_TIME_UNIT_MILLI:
+            *out = new 
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_MILLI>();
+            break;
+          case NANOARROW_TIME_UNIT_MICRO:
+            *out = new 
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_MICRO>();
+            break;
+          case NANOARROW_TIME_UNIT_SECOND:
+            *out = new 
PostgresCopyTimestampFieldWriter<NANOARROW_TIME_UNIT_SECOND>();
+            break;
+        }
+        return NANOARROW_OK;
+    }
     default:
       return EINVAL;
   }
@@ -1293,9 +1362,8 @@ class PostgresCopyStreamWriter {
           NANOARROW_OK) {
         return ADBC_STATUS_INTERNAL;
       }
-      const ArrowType arrow_type = schema_view.type;
       PostgresCopyFieldWriter* child_writer;
-      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, 
error));
+      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(schema_view, &child_writer, 
error));
       
root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
     }
 
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc 
b/c/driver/postgresql/postgres_copy_reader_test.cc
index 1fd80124..e4bbb9ea 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -16,7 +16,9 @@
 // under the License.
 
 #include <optional>
+#include <tuple>
 
+#include <gtest/gtest-param-test.h>
 #include <gtest/gtest.h>
 #include <nanoarrow/nanoarrow.hpp>
 
@@ -678,6 +680,126 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric) {
   EXPECT_EQ(std::string(item.data, item.size_bytes), "inf");
 }
 
+
+// COPY (SELECT CAST(col AS TIMESTAMP) FROM (  VALUES ('1900-01-01 12:34:56'),
+// ('2100-01-01 12:34:56'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT 
BINARY);
+static uint8_t kTestPgCopyTimestamp[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
+    0x00, 0x08, 0xff, 0xf4, 0xc9, 0xf9, 0x07, 0xe5, 0x9c, 0x00, 0x00,
+    0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0b, 0x36, 0x30, 0x2d, 0xa5,
+    0xfc, 0x00, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadTimestamp) {
+  ArrowBufferView data;
+  data.data.as_uint8 = kTestPgCopyTimestamp;
+  data.size_bytes = sizeof(kTestPgCopyTimestamp);
+
+  auto col_type = PostgresType(PostgresTypeId::kTimestamp);
+  PostgresType input_type(PostgresTypeId::kRecord);
+  input_type.AppendChild("col", col_type);
+
+  PostgresCopyStreamTester tester;
+  ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+  ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+  ASSERT_EQ(data.data.as_uint8 - kTestPgCopyTimestamp, 
sizeof(kTestPgCopyTimestamp));
+  ASSERT_EQ(data.size_bytes, 0);
+
+  nanoarrow::UniqueArray array;
+  ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
+  ASSERT_EQ(array->length, 3);
+  ASSERT_EQ(array->n_children, 1);
+
+  auto validity = reinterpret_cast<const 
uint8_t*>(array->children[0]->buffers[0]);
+  auto data_buffer = reinterpret_cast<const 
int64_t*>(array->children[0]->buffers[1]);
+  ASSERT_NE(validity, nullptr);
+  ASSERT_NE(data_buffer, nullptr);
+
+  ASSERT_TRUE(ArrowBitGet(validity, 0));
+  ASSERT_TRUE(ArrowBitGet(validity, 1));
+  ASSERT_FALSE(ArrowBitGet(validity, 3));
+
+  ASSERT_EQ(data_buffer[0], -2208943504000000);
+  ASSERT_EQ(data_buffer[1], 4102490096000000);
+}
+
+using TimestampTestParamType = std::tuple<enum ArrowTimeUnit,
+                                          const char *,
+                                          std::vector<std::optional<int64_t>>>;
+
+class PostgresCopyWriteTimestampTest : public testing::TestWithParam<
+  TimestampTestParamType> {
+};
+
+TEST_P(PostgresCopyWriteTimestampTest, WritesProperBufferValues) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+
+  TimestampTestParamType parameters = GetParam();
+  enum ArrowTimeUnit unit = std::get<0>(parameters);
+  const char* timezone = std::get<1>(parameters);
+
+  const std::vector<std::optional<int64_t>> values = std::get<2>(parameters);
+
+  ArrowSchemaInit(&schema.value);
+  ArrowSchemaSetTypeStruct(&schema.value, 1);
+  ArrowSchemaSetTypeDateTime(schema->children[0],
+                             NANOARROW_TYPE_TIMESTAMP,
+                             unit,
+                             timezone);
+  ArrowSchemaSetName(schema->children[0], "col");
+  ASSERT_EQ(adbc_validation::MakeBatch<int64_t>(&schema.value,
+                                                &array.value,
+                                                &na_error,
+                                                values),
+              ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+  ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+  const struct ArrowBuffer buf = tester.WriteBuffer();
+  // The last 2 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  constexpr size_t buf_size = sizeof(kTestPgCopyTimestamp) - 2;
+  ASSERT_EQ(buf.size_bytes, buf_size);
+  for (size_t i = 0; i < buf_size; i++) {
+    ASSERT_EQ(buf.data[i], kTestPgCopyTimestamp[i]);
+  }
+}
+
+static const std::vector<TimestampTestParamType> ts_values {
+  {NANOARROW_TIME_UNIT_SECOND, nullptr,
+   {-2208943504, 4102490096, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MILLI, nullptr,
+   {-2208943504000, 4102490096000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MICRO, nullptr,
+   {-2208943504000000, 4102490096000000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_NANO, nullptr,
+   {-2208943504000000000, 4102490096000000000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_SECOND, "UTC",
+   {-2208943504, 4102490096, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MILLI, "UTC",
+   {-2208943504000, 4102490096000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MICRO, "UTC",
+   {-2208943504000000, 4102490096000000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_NANO, "UTC",
+   {-2208943504000000000, 4102490096000000000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_SECOND, "America/New_York",
+   {-2208943504, 4102490096, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MILLI, "America/New_York",
+   {-2208943504000, 4102490096000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_MICRO, "America/New_York",
+   {-2208943504000000, 4102490096000000, std::nullopt}},
+  {NANOARROW_TIME_UNIT_NANO, "America/New_York",
+   {-2208943504000000000, 4102490096000000000, std::nullopt}},
+};
+
+INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteTimestamp,
+                         PostgresCopyWriteTimestampTest,
+                         testing::ValuesIn(ts_values));
+
 // COPY (SELECT CAST("col" AS TEXT) AS "col" FROM (  VALUES ('abc'), ('1234'),
 // (NULL::text)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
 static uint8_t kTestPgCopyText[] = {

Reply via email to