ShravanSunder commented on issue #3293:
URL: https://github.com/apache/arrow-adbc/issues/3293#issuecomment-3224368139

   Hi @lidavidm 
   
   i can share how i tested my solution.  It contains the type of information i 
expected the driver to handle for jsonb (json extension for arrow)
   
   ```python
   import typing as t
   
   import pyarrow as pa
   import pytest
   from bills_ai.helpers.adbc_arrow_shim import (
       prepare_arrow_table_for_pg_adbc,
   )
   from pydantic_core import from_json
   from voyager_lib.basemodel.arrow_table_base_model import ArrowTableBaseModel
   from voyager_lib.database.configure_voyager_db import get_adbc_cursor
   
   
   class EmbeddingRecord(ArrowTableBaseModel):
       id: int
       embedding: t.List[float]
       note: str
   
   
   def generate_vectors(n_rows: int, dim: int) -> t.List[t.List[float]]:
       # Deterministic values without numpy
       scale = 0.001
       out: t.List[t.List[float]] = []
       for i in range(n_rows):
           row: t.List[float] = [float(((i * 131 + j * 17) % 1000) * scale) for 
j in range(dim)]
           out.append(row)
       return out
   
   
   def coerce_id_dist(rows: t.Any) -> t.List[tuple[int, float]]:  # noqa: ANN401
       result: t.List[tuple[int, float]] = []
       typed_rows = t.cast(t.List[t.Tuple[t.Any, t.Any]], rows)
       for r in typed_rows:
           result.append((int(r[0]), float(r[1])))
       return result
   
   
   @pytest.fixture
   def setup_pgvector_tables(bills_voyager_db_session: None) -> t.Iterator[int]:
       dim = 1024
       with get_adbc_cursor() as cur:
           cur.execute("BEGIN")
           cur.execute("CREATE SCHEMA IF NOT EXISTS bills")
           cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
           cur.execute("DROP TABLE IF EXISTS emb_arrow")
           cur.execute(f"CREATE TABLE emb_arrow (id BIGINT PRIMARY KEY, 
embedding VECTOR({dim}), note TEXT)")
   
           cur.execute("COMMIT")
       try:
           yield dim
       finally:
           with get_adbc_cursor() as cur:
               cur.execute("DROP TABLE IF EXISTS emb_arrow")
               cur.execute("COMMIT")
   
   
   @pytest.mark.integration_db
   def 
test_adbc_ingest_and_similarity_via_arrowtable_bytes(setup_pgvector_tables: 
int) -> None:  # noqa: PLR0915
       # Arrange
       dim = setup_pgvector_tables
       n_rows = 8
   
       # Generate test data with id=1 having special note
       all_vectors = generate_vectors(n_rows, dim)
       rows: t.List[EmbeddingRecord] = []
       for i, vec in enumerate(all_vectors):
           note = "note1" if i == 0 else "first_batch"
           rows.append(EmbeddingRecord(id=i + 1, embedding=vec, note=note))
   
       first_batch_table = EmbeddingRecord.pydantic_list_to_arrow_table(rows)
       first_batch_converted = 
prepare_arrow_table_for_pg_adbc(first_batch_table)
   
       # Second batch excludes id=1 to preserve note1
       second_batch_vectors = generate_vectors(n_rows, dim)
       second_batch_rows = [EmbeddingRecord(id=i + 1, embedding=vec, 
note="second_batch") for i, vec in enumerate(second_batch_vectors) if i != 0]
       second_batch_table = 
EmbeddingRecord.pydantic_list_to_arrow_table(second_batch_rows)
       second_batch_converted = 
prepare_arrow_table_for_pg_adbc(second_batch_table)
   
       # Act - Insert first batch
       assert first_batch_converted.schema.field("embedding").type == 
pa.binary()
       assert second_batch_converted.schema.field("embedding").type == 
pa.binary()
   
       with get_adbc_cursor() as cur:
           try:
               cur.execute("BEGIN")
               cur.execute("DROP TABLE IF EXISTS emb_arrow_staging")
               cur.execute("CREATE TEMP TABLE emb_arrow_staging (LIKE emb_arrow 
INCLUDING DEFAULTS) ON COMMIT DROP")
               cur.adbc_ingest("emb_arrow_staging", first_batch_converted, 
mode="append", temporary=True)
   
               cur.execute(
                   """
                   DELETE FROM emb_arrow t
                   USING emb_arrow_staging s
                   WHERE t.id = s.id;
                   """,
               )
               cur.execute(
                   """
                   INSERT INTO emb_arrow
                   SELECT * FROM emb_arrow_staging;
                   """,
               )
               cur.execute("COMMIT")
           except Exception:
               cur.execute("ROLLBACK")
               raise
   
       # Act - Insert second batch (partial upsert)
       with get_adbc_cursor() as cur:
           try:
               cur.execute("BEGIN")
               cur.execute("DROP TABLE IF EXISTS emb_arrow_staging")
               cur.execute("CREATE TEMP TABLE emb_arrow_staging (LIKE emb_arrow 
INCLUDING DEFAULTS) ON COMMIT DROP")
               cur.adbc_ingest("emb_arrow_staging", second_batch_converted, 
mode="append", temporary=True)
   
               cur.execute(
                   """
                   DELETE FROM emb_arrow t
                   USING emb_arrow_staging s
                   WHERE t.id = s.id;
                   """,
               )
               cur.execute(
                   """
                   INSERT INTO emb_arrow
                   SELECT * FROM emb_arrow_staging;
                   """,
               )
               cur.execute("COMMIT")
           except Exception:
               cur.execute("ROLLBACK")
               raise
   
       # Assert
       with get_adbc_cursor() as cur:
           cur.execute("SELECT COUNT(*) FROM emb_arrow")
           count = cur.fetchone()[0]  # pyright: 
ignore[reportUnknownVariableType,reportOptionalSubscript]
           assert count == n_rows, f"Expected {n_rows} records, got {count}"
   
           # Verify id=1 kept note1, others have second_batch
           cur.execute("SELECT note FROM emb_arrow WHERE id = 1")
           id1_note = cur.fetchone()[0]  # pyright: 
ignore[reportUnknownVariableType,reportOptionalSubscript]
           assert id1_note == "note1", f"Expected id=1 to have 'note1', got 
'{id1_note}'"
   
           cur.execute("SELECT DISTINCT note FROM emb_arrow WHERE id > 1 ORDER 
BY note")
           other_notes = [row[0] for row in cur.fetchall()]  # pyright: 
ignore[reportUnknownVariableType]
           assert other_notes == ["second_batch"], f"Expected other records to 
have 'second_batch', got {other_notes}"
   
           # Verify similarity search works
           cur.execute(
               """
               SELECT e2.id,
                      (SELECT embedding FROM emb_arrow WHERE id = 1) <-> 
e2.embedding AS dist
               FROM emb_arrow e2
               ORDER BY dist ASC, id ASC
               """,
           )
           out = coerce_id_dist(cur.fetchall())
           ids_by_dist = [r[0] for r in out]
           assert ids_by_dist[0] == 1
           assert len(ids_by_dist) == n_rows
   
   
   # Type aliases for database result tuples
   JsonbTestRow = tuple[int, t.Dict[str, t.Any], str]
   CountRow = tuple[int]
   IdRow = tuple[int]
   
   
   class JsonbTestRecord(ArrowTableBaseModel):
       """Test model that creates extension<arrow.json> schema type."""
   
       id: int
       jsonb_data: t.Dict[str, t.Any]  # This will create extension<arrow.json>
       name: str
   
   
   @pytest.mark.integration_db
   def test_adbc_jsonb_arrow_extension_works() -> None:
       """Test that Arrow extension<arrow.json> works with PostgreSQL JSONB via 
ADBC using the shim."""
       # Arrange
       # Create test data with Dict field that creates extension<arrow.json> 
schema
       test_data = [
           JsonbTestRecord(
               id=1,
               jsonb_data={"key": "value", "array": [1, 2, 3], "nested": 
{"deep": True}},
               name="test_record",
           ),
           JsonbTestRecord(
               id=2,
               jsonb_data={"another": "json", "number": 42},
               name="second_record",
           ),
       ]
   
       # Create Arrow table - this will have extension<arrow.json> schema
       arrow_table = JsonbTestRecord.pydantic_list_to_arrow_table(test_data)
   
       # Verify original has extension<arrow.json> type
       assert str(arrow_table.schema.field("jsonb_data").type) == 
"extension<arrow.json>"  # pyright: ignore[reportUnknownArgumentType]
   
       converted_table = prepare_arrow_table_for_pg_adbc(arrow_table)
   
       # Act - Verify conversion to binary with 0x01 prefix
       assert converted_table.schema.field("jsonb_data").type == pa.binary()
       # Check the binary format has 0x01 prefix (JSONB version byte)
       binary_data = converted_table["jsonb_data"][0].as_py()
       assert binary_data.startswith(b"\x01"), "JSONB binary should start with 
version byte 0x01"
   
       with get_adbc_cursor() as cur:
           cur.execute("CREATE SCHEMA IF NOT EXISTS bills")
           cur.execute("DROP TABLE IF EXISTS jsonb_test")
           cur.execute("CREATE TEMP TABLE jsonb_test (id BIGINT PRIMARY KEY, 
jsonb_data JSONB, name TEXT)")
   
           # This should now work with the converted table
           cur.adbc_ingest("jsonb_test", converted_table, mode="append", 
temporary=True)
   
           # Assert
           # Verify data was inserted correctly
           cur.execute("SELECT COUNT(*) FROM jsonb_test")
           count_result = t.cast(CountRow | None, cur.fetchone())
           assert count_result is not None
           count = count_result[0]
           assert count == 2, f"Expected 2 records, got {count}"
   
           # Verify JSON data is properly stored and queryable
           cur.execute("SELECT id, jsonb_data, name FROM jsonb_test ORDER BY 
id")
           rows = t.cast(t.List[JsonbTestRow], cur.fetchall())
   
           # Check first record
           first_row = rows[0]
           assert first_row[0] == 1
           assert first_row[2] == "test_record"
           first_json = first_row[1]
   
           # ADBC returns JSONB as string, parse it
           if isinstance(first_json, str):
               first_json = from_json(first_json)
   
           assert first_json["key"] == "value"
           assert first_json["array"] == [1, 2, 3]
           assert first_json["nested"]["deep"] is True
   
           # Check second record
           second_row = rows[1]
           assert second_row[0] == 2
           assert second_row[2] == "second_record"
           second_json = second_row[1]
   
           # ADBC returns JSONB as string, parse it
           if isinstance(second_json, str):
               second_json = from_json(second_json)
   
           assert second_json["another"] == "json"
           assert second_json["number"] == 42
   
           # Test JSON querying capabilities
           cur.execute("SELECT id FROM jsonb_test WHERE jsonb_data->>'key' = 
'value'")
           json_query_result = t.cast(IdRow | None, cur.fetchone())
           assert json_query_result is not None
           assert json_query_result[0] == 1
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to