This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new d6b66fc feat(glib): add gadbc_statement_bind_stream() (#536)
d6b66fc is described below
commit d6b66fcc240d1a074d2c8361589c20f2fae727bc
Author: Sutou Kouhei <[email protected]>
AuthorDate: Thu Mar 23 11:20:49 2023 +0900
feat(glib): add gadbc_statement_bind_stream() (#536)
Fixes #525.
---
glib/adbc-glib/statement.c | 30 ++++++++++++++++++++++++++++++
glib/adbc-glib/statement.h | 3 +++
glib/test/test-statement.rb | 27 +++++++++++++++++++++++++++
ruby/lib/adbc/statement.rb | 43 ++++++++++++++++++++++++++++++++++---------
ruby/test/test-statement.rb | 15 +++++++++++++++
5 files changed, 109 insertions(+), 9 deletions(-)
diff --git a/glib/adbc-glib/statement.c b/glib/adbc-glib/statement.c
index fad52ef..36734a6 100644
--- a/glib/adbc-glib/statement.c
+++ b/glib/adbc-glib/statement.c
@@ -296,6 +296,36 @@ gboolean gadbc_statement_bind(GADBCStatement* statement,
gpointer c_abi_array,
return gadbc_error_check(error, status_code, &adbc_error, context);
}
+/**
+ * gadbc_statement_bind_stream:
+ * @statement: A #GADBCStatement.
+ * @c_abi_array_stream: A `struct ArrowArrayStream *` of record batches stream
+ * to bind. The driver will call the release callback itself, although
+ * it may not do this until the statement is released.
+ * @error: (out) (optional): Return location for a #GError or %NULL.
+ *
+ * Bind Arrow data stream. This can be used for bulk inserts or prepared
+ * statements.
+ *
+ * Returns: %TRUE if binding is done successfully, %FALSE
+ * otherwise.
+ *
+ * Since: 0.4.0
+ */
+gboolean gadbc_statement_bind_stream(GADBCStatement* statement,
+ gpointer c_abi_array_stream, GError**
error) {
+ const gchar* context = "[adbc][statement][bind-stream]";
+ struct AdbcStatement* adbc_statement =
+ gadbc_statement_get_raw(statement, context, error);
+ if (!adbc_statement) {
+ return FALSE;
+ }
+ struct AdbcError adbc_error = {};
+ AdbcStatusCode status_code =
+ AdbcStatementBindStream(adbc_statement, c_abi_array_stream, &adbc_error);
+ return gadbc_error_check(error, status_code, &adbc_error, context);
+}
+
/**
* gadbc_statement_execute:
* @statement: A #GADBCStatement.
diff --git a/glib/adbc-glib/statement.h b/glib/adbc-glib/statement.h
index 0b1de36..cf6a3d5 100644
--- a/glib/adbc-glib/statement.h
+++ b/glib/adbc-glib/statement.h
@@ -68,6 +68,9 @@ gboolean gadbc_statement_prepare(GADBCStatement* statement,
GError** error);
GADBC_AVAILABLE_IN_0_4
gboolean gadbc_statement_bind(GADBCStatement* statement, gpointer c_abi_array,
gpointer c_abi_schema, GError** error);
+GADBC_AVAILABLE_IN_0_4
+gboolean gadbc_statement_bind_stream(GADBCStatement* statement,
+ gpointer c_abi_array_stream, GError**
error);
GADBC_AVAILABLE_IN_0_1
gboolean gadbc_statement_execute(GADBCStatement* statement, gboolean
need_result,
gpointer* c_abi_array_stream, gint64*
n_rows_affected,
diff --git a/glib/test/test-statement.rb b/glib/test/test-statement.rb
index 242b4f5..8daf803 100644
--- a/glib/test/test-statement.rb
+++ b/glib/test/test-statement.rb
@@ -82,4 +82,31 @@ class StatementTest < Test::Unit::TestCase
table)
end
end
+
+ def test_bind_stream
+ @statement.set_sql_query("CREATE TABLE data (number int)")
+ execute_statement
+
+ record_batch =
+ Arrow::RecordBatch.new(number: Arrow::Int64Array.new([10, 20, 30]))
+ @statement.set_sql_query("INSERT INTO data VALUES (?)")
+ @statement.ingest_target_table = "data"
+ @statement.ingest_mode = :append
+ reader = Arrow::RecordBatchReader.new([record_batch])
+ c_abi_array_stream = reader.export
+ begin
+ @statement.bind_stream(c_abi_array_stream)
+ execute_statement(need_result: false) do |n_rows_affected|
+ assert_equal(3, n_rows_affected)
+ end
+ ensure
+ GLib.free(c_abi_array_stream)
+ end
+
+ @statement.set_sql_query("SELECT * FROM data")
+ execute_statement do |table, _n_rows_affected|
+ assert_equal(record_batch.to_table,
+ table)
+ end
+ end
end
diff --git a/ruby/lib/adbc/statement.rb b/ruby/lib/adbc/statement.rb
index ca3dfcf..90f4c23 100644
--- a/ruby/lib/adbc/statement.rb
+++ b/ruby/lib/adbc/statement.rb
@@ -55,6 +55,39 @@ module ADBC
end
end
+ alias_method :bind_raw, :bind
+ def bind(*args)
+ n_args = args.size
+ if block_given?
+ message = "wrong number of arguments (given #{n_args}, expected 1 with
block)"
+ raise ArgumentError, message unless n_args == 1
+ values = args[0]
+ if values.is_a?(Arrow::RecordBatchReader)
+ c_abi_array_stream = values.export
+ begin
+ bind_stream(c_abi_array_stream)
+ yield
+ ensure
+ GLib.free(c_abi_array_stream)
+ end
+ else
+ _, c_abi_array, c_abi_schema = values.export
+ begin
+ bind_raw(c_abi_array, c_abi_schema)
+ yield
+ ensure
+ begin
+ GLib.free(c_abi_array)
+ ensure
+ GLib.free(c_abi_schema)
+ end
+ end
+ end
+ else
+ bind_raw(*args)
+ end
+ end
+
def ingest(table_name, values, mode: :create)
insert = "INSERT INTO #{table_name} (" # TODO escape
fields = values.schema.fields
@@ -65,16 +98,8 @@ module ADBC
self.sql_query = insert
self.ingest_target_table = table_name
self.ingest_mode = mode
- _, c_abi_array, c_abi_schema = values.export
- begin
- bind(c_abi_array, c_abi_schema)
+ bind(values) do
execute(need_result: false)
- ensure
- begin
- GLib.free(c_abi_array)
- ensure
- GLib.free(c_abi_schema)
- end
end
end
diff --git a/ruby/test/test-statement.rb b/ruby/test/test-statement.rb
index d6f7075..cac5b94 100644
--- a/ruby/test/test-statement.rb
+++ b/ruby/test/test-statement.rb
@@ -46,4 +46,19 @@ class StatementTest < Test::Unit::TestCase
n_rows_affected,
])
end
+
+ def test_ingest_stream
+ numbers = Arrow::Int64Array.new([10, 20, 30])
+ record_batch = Arrow::RecordBatch.new(number: numbers)
+ @statement.ingest("data", Arrow::RecordBatchReader.new([record_batch]))
+ table, n_rows_affected = @statement.query("SELECT * FROM data")
+ assert_equal([
+ Arrow::Table.new(number: numbers),
+ -1,
+ ],
+ [
+ table,
+ n_rows_affected,
+ ])
+ end
end