This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 6eb5506cf feat(c/driver/postgresql): Support JSON and JSONB types 
(#2072)
6eb5506cf is described below

commit 6eb5506cf5b7df68e64032736ecf13358317ae2f
Author: Dewey Dunnington <[email protected]>
AuthorDate: Mon Aug 12 20:33:04 2024 -0300

    feat(c/driver/postgresql): Support JSON and JSONB types (#2072)
    
    This PR adds support for JSON and JSONB types. Before this PR, the raw
    COPY was returned: for JSON, this would have been the bytes of a the
    JSON string. For JSONB, this would have been the bytes of the JSON
    string prefixed by `0x01`, which is the one and only version number of
    tthe JSONB COPY binary format.
    
    This PR routes it through the string type. We could also route this
    through the JSON canonical extension type by adding some metadata. I
    don't think an implementation of that type exists anywhere yet (but it
    might functionally be the same since pyarrow/Arrow C++ would just drop
    the extension metadata).
    
    The testing here is a bit repetitive...we could definitely improve on
    reducing duplication in the test cases 😬 .
    
    Closes #2068.
    
    Reproducer in R:
    
    ``` r
    library(adbcdrivermanager)
    #> Warning: package 'adbcdrivermanager' was built under R version 4.3.3
    
    con <- adbc_database_init(
      adbcpostgresql::adbcpostgresql(),
      uri = 
"postgresql://localhost:5432/postgres?user=postgres&password=password"
    ) |>
      adbc_connection_init()
    
    lots_of_json_url <- 
"https://github.com/apache/arrow-experiments/raw/main/data/arrow-commits/arrow-commits.jsonl";
    lines <- readLines(lots_of_json_url)
    
    con |>
      execute_adbc("DROP TABLE IF EXISTS much_json")
    
    data.frame(lines = lines) |>
      write_adbc(con, "much_json")
    
    con |>
      read_adbc("select lines::jsonb as lines from much_json") |>
      tibble::as_tibble()
    #> # A tibble: 15,487 × 1
    #>    lines
    #>    <chr>
    #>  1 "{\"time\": \"2024-03-07T02:00:52\", \"files\": 2, \"merge\": false, 
\"commi…
    #>  2 "{\"time\": \"2024-03-06T21:51:34\", \"files\": 1, \"merge\": false, 
\"commi…
    #>  3 "{\"time\": \"2024-03-06T20:29:15\", \"files\": 1, \"merge\": false, 
\"commi…
    #>  4 "{\"time\": \"2024-03-06T07:46:45\", \"files\": 1, \"merge\": false, 
\"commi…
    #>  5 "{\"time\": \"2024-03-05T16:13:32\", \"files\": 1, \"merge\": false, 
\"commi…
    #>  6 "{\"time\": \"2024-03-05T14:53:13\", \"files\": 20, \"merge\": false, 
\"comm…
    #>  7 "{\"time\": \"2024-03-05T12:31:38\", \"files\": 2, \"merge\": false, 
\"commi…
    #>  8 "{\"time\": \"2024-03-05T08:15:42\", \"files\": 6, \"merge\": false, 
\"commi…
    #>  9 "{\"time\": \"2024-03-05T07:56:25\", \"files\": 2, \"merge\": false, 
\"commi…
    #> 10 "{\"time\": \"2024-03-05T01:04:20\", \"files\": 1, \"merge\": false, 
\"commi…
    #> # ℹ 15,477 more rows
    ```
    
    <sup>Created on 2024-08-11 with [reprex
    v2.1.0](https://reprex.tidyverse.org)</sup>
---
 .../postgresql/copy/postgres_copy_reader_test.cc   | 80 ++++++++++++++++++++++
 .../postgresql/copy/postgres_copy_test_common.h    | 22 ++++++
 c/driver/postgresql/copy/reader.h                  | 45 ++++++++++++
 c/driver/postgresql/postgres_type.h                |  2 +
 4 files changed, 149 insertions(+)

diff --git a/c/driver/postgresql/copy/postgres_copy_reader_test.cc 
b/c/driver/postgresql/copy/postgres_copy_reader_test.cc
index 958df4a93..60e0b6aaf 100644
--- a/c/driver/postgresql/copy/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/copy/postgres_copy_reader_test.cc
@@ -620,6 +620,86 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadEnum) {
   ASSERT_EQ(std::string(data_buffer + 2, 3), "sad");
 }
 
+TEST(PostgresCopyUtilsTest, PostgresCopyReadJson) {
+  ArrowBufferView data;
+  data.data.as_uint8 = kTestPgCopyJson;
+  data.size_bytes = sizeof(kTestPgCopyJson);
+
+  auto col_type = PostgresType(PostgresTypeId::kJson);
+  PostgresType input_type(PostgresTypeId::kRecord);
+  input_type.AppendChild("col", col_type);
+
+  PostgresCopyStreamTester tester;
+  ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+  ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+  ASSERT_EQ(data.data.as_uint8 - kTestPgCopyJson, sizeof(kTestPgCopyJson));
+  ASSERT_EQ(data.size_bytes, 0);
+
+  nanoarrow::UniqueArray array;
+  ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
+  ASSERT_EQ(array->length, 3);
+  ASSERT_EQ(array->n_children, 1);
+
+  auto validity = reinterpret_cast<const 
uint8_t*>(array->children[0]->buffers[0]);
+  auto offsets = reinterpret_cast<const 
int32_t*>(array->children[0]->buffers[1]);
+  auto data_buffer = reinterpret_cast<const 
char*>(array->children[0]->buffers[2]);
+  ASSERT_NE(validity, nullptr);
+  ASSERT_NE(data_buffer, nullptr);
+
+  ASSERT_TRUE(ArrowBitGet(validity, 0));
+  ASSERT_TRUE(ArrowBitGet(validity, 1));
+  ASSERT_FALSE(ArrowBitGet(validity, 2));
+
+  ASSERT_EQ(offsets[0], 0);
+  ASSERT_EQ(offsets[1], 9);
+  ASSERT_EQ(offsets[2], 18);
+  ASSERT_EQ(offsets[3], 18);
+
+  ASSERT_EQ(std::string(data_buffer, 9), "[1, 2, 3]");
+  ASSERT_EQ(std::string(data_buffer + 9, 9), "[4, 5, 6]");
+}
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadJsonb) {
+  ArrowBufferView data;
+  data.data.as_uint8 = kTestPgCopyJsonb;
+  data.size_bytes = sizeof(kTestPgCopyJsonb);
+
+  auto col_type = PostgresType(PostgresTypeId::kJsonb);
+  PostgresType input_type(PostgresTypeId::kRecord);
+  input_type.AppendChild("col", col_type);
+
+  struct ArrowError error;
+  PostgresCopyStreamTester tester;
+  ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+  ASSERT_EQ(tester.ReadAll(&data, &error), ENODATA) << error.message;
+  ASSERT_EQ(data.data.as_uint8 - kTestPgCopyJsonb, sizeof(kTestPgCopyJsonb));
+  ASSERT_EQ(data.size_bytes, 0);
+
+  nanoarrow::UniqueArray array;
+
+  ASSERT_EQ(tester.GetArray(array.get(), &error), NANOARROW_OK) << 
error.message;
+  ASSERT_EQ(array->length, 3);
+  ASSERT_EQ(array->n_children, 1);
+
+  auto validity = reinterpret_cast<const 
uint8_t*>(array->children[0]->buffers[0]);
+  auto offsets = reinterpret_cast<const 
int32_t*>(array->children[0]->buffers[1]);
+  auto data_buffer = reinterpret_cast<const 
char*>(array->children[0]->buffers[2]);
+  ASSERT_NE(validity, nullptr);
+  ASSERT_NE(data_buffer, nullptr);
+
+  ASSERT_TRUE(ArrowBitGet(validity, 0));
+  ASSERT_TRUE(ArrowBitGet(validity, 1));
+  ASSERT_FALSE(ArrowBitGet(validity, 2));
+
+  ASSERT_EQ(offsets[0], 0);
+  ASSERT_EQ(offsets[1], 9);
+  ASSERT_EQ(offsets[2], 18);
+  ASSERT_EQ(offsets[3], 18);
+
+  ASSERT_EQ(std::string(data_buffer, 9), "[1, 2, 3]");
+  ASSERT_EQ(std::string(data_buffer + 9, 9), "[4, 5, 6]");
+}
+
 TEST(PostgresCopyUtilsTest, PostgresCopyReadBinary) {
   ArrowBufferView data;
   data.data.as_uint8 = kTestPgCopyBinary;
diff --git a/c/driver/postgresql/copy/postgres_copy_test_common.h 
b/c/driver/postgresql/copy/postgres_copy_test_common.h
index 3c98d089c..8872ada6d 100644
--- a/c/driver/postgresql/copy/postgres_copy_test_common.h
+++ b/c/driver/postgresql/copy/postgres_copy_test_common.h
@@ -21,6 +21,10 @@
 
 namespace adbcpq {
 
+// New cases can be genereated using:
+// psql --host 127.0.0.1 --port 5432 --username postgres -c "COPY (SELECT ...) 
TO STDOUT
+// WITH (FORMAT binary);" > test.copy Rscript -e 
"dput(brio::read_file_raw('test.copy'))"
+
 // COPY (SELECT CAST("col" AS BOOLEAN) AS "col" FROM (  VALUES (TRUE), 
(FALSE), (NULL)) AS
 // drvd("col")) TO STDOUT;
 static const uint8_t kTestPgCopyBoolean[] = {
@@ -116,6 +120,24 @@ static const uint8_t kTestPgCopyText[] = {
     0x03, 0x61, 0x62, 0x63, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x31, 0x32,
     0x33, 0x34, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
 
+// COPY (SELECT CAST(col AS json) AS col FROM (VALUES ('[1, 2, 3]'), ('[4, 5, 
6]'),
+// (NULL::json)) AS drvd(col)) TO STDOUT WITH (FORMAT binary);
+static const uint8_t kTestPgCopyJson[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
+    0x09, 0x5b, 0x31, 0x2c, 0x20, 0x32, 0x2c, 0x20, 0x33, 0x5d, 0x00, 0x01,
+    0x00, 0x00, 0x00, 0x09, 0x5b, 0x34, 0x2c, 0x20, 0x35, 0x2c, 0x20, 0x36,
+    0x5d, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
+// COPY (SELECT CAST(col AS jsonb) AS col FROM (VALUES ('[1, 2, 3]'), ('[4, 5, 
6]'),
+// (NULL::jsonb)) AS drvd(col)) TO STDOUT WITH (FORMAT binary);
+static const uint8_t kTestPgCopyJsonb[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
+    0x0a, 0x01, 0x5b, 0x31, 0x2c, 0x20, 0x32, 0x2c, 0x20, 0x33, 0x5d, 0x00,
+    0x01, 0x00, 0x00, 0x00, 0x0a, 0x01, 0x5b, 0x34, 0x2c, 0x20, 0x35, 0x2c,
+    0x20, 0x36, 0x5d, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
 // COPY (SELECT CAST("col" AS BYTEA) AS "col" FROM (  VALUES (''), ('\x0001'),
 // ('\x01020304'), ('\xFEFF'), (NULL)) AS drvd("col")) TO STDOUT
 // WITH (FORMAT binary);
diff --git a/c/driver/postgresql/copy/reader.h 
b/c/driver/postgresql/copy/reader.h
index 371445a8e..983f39226 100644
--- a/c/driver/postgresql/copy/reader.h
+++ b/c/driver/postgresql/copy/reader.h
@@ -443,6 +443,47 @@ class PostgresCopyBinaryFieldReader : public 
PostgresCopyFieldReader {
   }
 };
 
+/// Postgres JSONB emits as the JSON string prefixed with a version number
+/// 
(https://github.com/postgres/postgres/blob/3f44959f47460fb350d25d760cf2384f9aa14e9a/src/backend/utils/adt/jsonb.c#L80-L87
+/// ) Currently there is only one version, so functionally this is a just 
string prefixed
+/// with 0x01.
+class PostgresCopyJsonbFieldReader : public PostgresCopyFieldReader {
+ public:
+  ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, 
ArrowArray* array,
+                      ArrowError* error) override {
+    // -1 for NULL (0 would be empty string)
+    if (field_size_bytes < 0) {
+      return ArrowArrayAppendNull(array, 1);
+    }
+
+    if (field_size_bytes > data->size_bytes) {
+      ArrowErrorSet(error, "Expected %d bytes of field data but got %d bytes 
of input",
+                    static_cast<int>(field_size_bytes),
+                    static_cast<int>(data->size_bytes));  // 
NOLINT(runtime/int)
+      return EINVAL;
+    }
+
+    int8_t version;
+    NANOARROW_RETURN_NOT_OK(ReadChecked<int8_t>(data, &version, error));
+    if (version != 1) {
+      ArrowErrorSet(error, "Expected JSONB binary version 0x01 but got %d",
+                    static_cast<int>(version));
+      return NANOARROW_OK;
+    }
+
+    field_size_bytes -= 1;
+    NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, data->data.data, 
field_size_bytes));
+    data->data.as_uint8 += field_size_bytes;
+    data->size_bytes -= field_size_bytes;
+
+    int32_t* offsets = reinterpret_cast<int32_t*>(offsets_->data);
+    NANOARROW_RETURN_NOT_OK(
+        ArrowBufferAppendInt32(offsets_, offsets[array->length] + 
field_size_bytes));
+
+    return AppendValid(array);
+  }
+};
+
 class PostgresCopyArrayFieldReader : public PostgresCopyFieldReader {
  public:
   void InitChild(std::unique_ptr<PostgresCopyFieldReader> child) {
@@ -774,11 +815,15 @@ static inline ArrowErrorCode MakeCopyFieldReader(
         case PostgresTypeId::kBpchar:
         case PostgresTypeId::kName:
         case PostgresTypeId::kEnum:
+        case PostgresTypeId::kJson:
           *out = std::make_unique<PostgresCopyBinaryFieldReader>();
           return NANOARROW_OK;
         case PostgresTypeId::kNumeric:
           *out = std::make_unique<PostgresCopyNumericFieldReader>();
           return NANOARROW_OK;
+        case PostgresTypeId::kJsonb:
+          *out = std::make_unique<PostgresCopyJsonbFieldReader>();
+          return NANOARROW_OK;
         default:
           return ErrorCantConvert(error, pg_type, schema_view);
       }
diff --git a/c/driver/postgresql/postgres_type.h 
b/c/driver/postgresql/postgres_type.h
index c7cc55745..02748cf91 100644
--- a/c/driver/postgresql/postgres_type.h
+++ b/c/driver/postgresql/postgres_type.h
@@ -233,6 +233,8 @@ class PostgresType {
       case PostgresTypeId::kText:
       case PostgresTypeId::kName:
       case PostgresTypeId::kEnum:
+      case PostgresTypeId::kJson:
+      case PostgresTypeId::kJsonb:
         NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, 
NANOARROW_TYPE_STRING));
         break;
       case PostgresTypeId::kBytea:

Reply via email to