This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new d6b66fc  feat(glib): add gadbc_statement_bind_stream() (#536)
d6b66fc is described below

commit d6b66fcc240d1a074d2c8361589c20f2fae727bc
Author: Sutou Kouhei <[email protected]>
AuthorDate: Thu Mar 23 11:20:49 2023 +0900

    feat(glib): add gadbc_statement_bind_stream() (#536)
    
    Fixes #525.
---
 glib/adbc-glib/statement.c  | 30 ++++++++++++++++++++++++++++++
 glib/adbc-glib/statement.h  |  3 +++
 glib/test/test-statement.rb | 27 +++++++++++++++++++++++++++
 ruby/lib/adbc/statement.rb  | 43 ++++++++++++++++++++++++++++++++++---------
 ruby/test/test-statement.rb | 15 +++++++++++++++
 5 files changed, 109 insertions(+), 9 deletions(-)

diff --git a/glib/adbc-glib/statement.c b/glib/adbc-glib/statement.c
index fad52ef..36734a6 100644
--- a/glib/adbc-glib/statement.c
+++ b/glib/adbc-glib/statement.c
@@ -296,6 +296,36 @@ gboolean gadbc_statement_bind(GADBCStatement* statement, 
gpointer c_abi_array,
   return gadbc_error_check(error, status_code, &adbc_error, context);
 }
 
+/**
+ * gadbc_statement_bind_stream:
+ * @statement: A #GADBCStatement.
+ * @c_abi_array_stream: A `struct ArrowArrayStream *` of record batches stream
+ *   to bind. The driver will call the release callback itself, although
+ *   it may not do this until the statement is released.
+ * @error: (out) (optional): Return location for a #GError or %NULL.
+ *
+ * Bind Arrow data stream. This can be used for bulk inserts or prepared
+ * statements.
+ *
+ * Returns: %TRUE if binding is done successfully, %FALSE
+ *   otherwise.
+ *
+ * Since: 0.4.0
+ */
+gboolean gadbc_statement_bind_stream(GADBCStatement* statement,
+                                     gpointer c_abi_array_stream, GError** 
error) {
+  const gchar* context = "[adbc][statement][bind-stream]";
+  struct AdbcStatement* adbc_statement =
+      gadbc_statement_get_raw(statement, context, error);
+  if (!adbc_statement) {
+    return FALSE;
+  }
+  struct AdbcError adbc_error = {};
+  AdbcStatusCode status_code =
+      AdbcStatementBindStream(adbc_statement, c_abi_array_stream, &adbc_error);
+  return gadbc_error_check(error, status_code, &adbc_error, context);
+}
+
 /**
  * gadbc_statement_execute:
  * @statement: A #GADBCStatement.
diff --git a/glib/adbc-glib/statement.h b/glib/adbc-glib/statement.h
index 0b1de36..cf6a3d5 100644
--- a/glib/adbc-glib/statement.h
+++ b/glib/adbc-glib/statement.h
@@ -68,6 +68,9 @@ gboolean gadbc_statement_prepare(GADBCStatement* statement, 
GError** error);
 GADBC_AVAILABLE_IN_0_4
 gboolean gadbc_statement_bind(GADBCStatement* statement, gpointer c_abi_array,
                               gpointer c_abi_schema, GError** error);
+GADBC_AVAILABLE_IN_0_4
+gboolean gadbc_statement_bind_stream(GADBCStatement* statement,
+                                     gpointer c_abi_array_stream, GError** 
error);
 GADBC_AVAILABLE_IN_0_1
 gboolean gadbc_statement_execute(GADBCStatement* statement, gboolean 
need_result,
                                  gpointer* c_abi_array_stream, gint64* 
n_rows_affected,
diff --git a/glib/test/test-statement.rb b/glib/test/test-statement.rb
index 242b4f5..8daf803 100644
--- a/glib/test/test-statement.rb
+++ b/glib/test/test-statement.rb
@@ -82,4 +82,31 @@ class StatementTest < Test::Unit::TestCase
                    table)
     end
   end
+
+  def test_bind_stream
+    @statement.set_sql_query("CREATE TABLE data (number int)")
+    execute_statement
+
+    record_batch =
+      Arrow::RecordBatch.new(number: Arrow::Int64Array.new([10, 20, 30]))
+    @statement.set_sql_query("INSERT INTO data VALUES (?)")
+    @statement.ingest_target_table = "data"
+    @statement.ingest_mode = :append
+    reader = Arrow::RecordBatchReader.new([record_batch])
+    c_abi_array_stream = reader.export
+    begin
+      @statement.bind_stream(c_abi_array_stream)
+      execute_statement(need_result: false) do |n_rows_affected|
+        assert_equal(3, n_rows_affected)
+      end
+    ensure
+      GLib.free(c_abi_array_stream)
+    end
+
+    @statement.set_sql_query("SELECT * FROM data")
+    execute_statement do |table, _n_rows_affected|
+      assert_equal(record_batch.to_table,
+                   table)
+    end
+  end
 end
diff --git a/ruby/lib/adbc/statement.rb b/ruby/lib/adbc/statement.rb
index ca3dfcf..90f4c23 100644
--- a/ruby/lib/adbc/statement.rb
+++ b/ruby/lib/adbc/statement.rb
@@ -55,6 +55,39 @@ module ADBC
       end
     end
 
+    alias_method :bind_raw, :bind
+    def bind(*args)
+      n_args = args.size
+      if block_given?
+        message = "wrong number of arguments (given #{n_args}, expected 1 with 
block)"
+        raise ArgumentError, message unless n_args == 1
+        values = args[0]
+        if values.is_a?(Arrow::RecordBatchReader)
+          c_abi_array_stream = values.export
+          begin
+            bind_stream(c_abi_array_stream)
+            yield
+          ensure
+            GLib.free(c_abi_array_stream)
+          end
+        else
+          _, c_abi_array, c_abi_schema = values.export
+          begin
+            bind_raw(c_abi_array, c_abi_schema)
+            yield
+          ensure
+            begin
+              GLib.free(c_abi_array)
+            ensure
+              GLib.free(c_abi_schema)
+            end
+          end
+        end
+      else
+        bind_raw(*args)
+      end
+    end
+
     def ingest(table_name, values, mode: :create)
       insert = "INSERT INTO #{table_name} (" # TODO escape
       fields = values.schema.fields
@@ -65,16 +98,8 @@ module ADBC
       self.sql_query = insert
       self.ingest_target_table = table_name
       self.ingest_mode = mode
-      _, c_abi_array, c_abi_schema = values.export
-      begin
-        bind(c_abi_array, c_abi_schema)
+      bind(values) do
         execute(need_result: false)
-      ensure
-        begin
-          GLib.free(c_abi_array)
-        ensure
-          GLib.free(c_abi_schema)
-        end
       end
     end
 
diff --git a/ruby/test/test-statement.rb b/ruby/test/test-statement.rb
index d6f7075..cac5b94 100644
--- a/ruby/test/test-statement.rb
+++ b/ruby/test/test-statement.rb
@@ -46,4 +46,19 @@ class StatementTest < Test::Unit::TestCase
                    n_rows_affected,
                  ])
   end
+
+  def test_ingest_stream
+    numbers = Arrow::Int64Array.new([10, 20, 30])
+    record_batch = Arrow::RecordBatch.new(number: numbers)
+    @statement.ingest("data", Arrow::RecordBatchReader.new([record_batch]))
+    table, n_rows_affected = @statement.query("SELECT * FROM data")
+    assert_equal([
+                   Arrow::Table.new(number: numbers),
+                   -1,
+                 ],
+                 [
+                   table,
+                   n_rows_affected,
+                 ])
+  end
 end

Reply via email to