This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 7986d2e07 feat(java/driver/jni): add executeUpdate, prepare,
bulkIngest (#3966)
7986d2e07 is described below
commit 7986d2e076035d2c311f340223e4d6a0bd86b68e
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Fri Feb 13 12:40:13 2026 +0400
feat(java/driver/jni): add executeUpdate, prepare, bulkIngest (#3966)
- added bindings needed for executeUpdate, statementPrepare and
bulkIngest
- also added a Test annotation on queryParams test which was missing
Part of #3257.
---
java/driver/jni/src/main/cpp/jni_wrapper.cc | 43 +++++++++
.../arrow/adbc/driver/jni/JniConnection.java | 36 +++++++
.../apache/arrow/adbc/driver/jni/JniStatement.java | 5 +-
.../arrow/adbc/driver/jni/impl/JniLoader.java | 13 +++
.../arrow/adbc/driver/jni/impl/NativeAdbc.java | 6 ++
.../arrow/adbc/driver/jni/JniDriverTest.java | 103 +++++++++++++++++++++
6 files changed, 204 insertions(+), 2 deletions(-)
diff --git a/java/driver/jni/src/main/cpp/jni_wrapper.cc
b/java/driver/jni/src/main/cpp/jni_wrapper.cc
index 7613743dd..fdfea15b0 100644
--- a/java/driver/jni/src/main/cpp/jni_wrapper.cc
+++ b/java/driver/jni/src/main/cpp/jni_wrapper.cc
@@ -402,4 +402,47 @@
Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementBindStream(
e.ThrowJavaException(env);
}
}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecuteUpdate(
+ JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
+ try {
+ struct AdbcError error = ADBC_ERROR_INIT;
+ auto* ptr = reinterpret_cast<struct
AdbcStatement*>(static_cast<uintptr_t>(handle));
+ int64_t rows_affected = 0;
+ CHECK_ADBC_ERROR(
+ AdbcStatementExecuteQuery(ptr, /*out=*/nullptr, &rows_affected,
&error), error);
+ return static_cast<jlong>(rows_affected);
+ } catch (const AdbcException& e) {
+ e.ThrowJavaException(env);
+ }
+ return -1;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementPrepare(
+ JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) {
+ try {
+ struct AdbcError error = ADBC_ERROR_INIT;
+ auto* ptr = reinterpret_cast<struct
AdbcStatement*>(static_cast<uintptr_t>(handle));
+ CHECK_ADBC_ERROR(AdbcStatementPrepare(ptr, &error), error);
+ } catch (const AdbcException& e) {
+ e.ThrowJavaException(env);
+ }
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementSetOption(
+ JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jstring key,
jstring value) {
+ try {
+ struct AdbcError error = ADBC_ERROR_INIT;
+ auto* ptr = reinterpret_cast<struct
AdbcStatement*>(static_cast<uintptr_t>(handle));
+ JniStringView key_str(env, key);
+ JniStringView value_str(env, value);
+ CHECK_ADBC_ERROR(AdbcStatementSetOption(ptr, key_str.value,
value_str.value, &error),
+ error);
+ } catch (const AdbcException& e) {
+ e.ThrowJavaException(env);
+ }
+}
}
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
index e16f8c861..7c1b9a4a6 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java
@@ -20,8 +20,10 @@ package org.apache.arrow.adbc.driver.jni;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.driver.jni.impl.JniLoader;
import org.apache.arrow.adbc.driver.jni.impl.NativeConnectionHandle;
+import org.apache.arrow.adbc.driver.jni.impl.NativeStatementHandle;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -40,6 +42,40 @@ public class JniConnection implements AdbcConnection {
return new JniStatement(allocator,
JniLoader.INSTANCE.openStatement(handle));
}
+ @Override
+ public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
+ throws AdbcException {
+ NativeStatementHandle stmtHandle =
JniLoader.INSTANCE.openStatement(handle);
+ try {
+ String modeValue;
+ switch (mode) {
+ case CREATE:
+ modeValue = "adbc.ingest.mode.create";
+ break;
+ case APPEND:
+ modeValue = "adbc.ingest.mode.append";
+ break;
+ case REPLACE:
+ modeValue = "adbc.ingest.mode.replace";
+ break;
+ case CREATE_APPEND:
+ modeValue = "adbc.ingest.mode.create_append";
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown bulk ingest mode: " +
mode);
+ }
+
+ JniLoader.INSTANCE.statementSetOption(
+ stmtHandle, "adbc.ingest.target_table", targetTableName);
+ JniLoader.INSTANCE.statementSetOption(stmtHandle, "adbc.ingest.mode",
modeValue);
+
+ return new JniStatement(allocator, stmtHandle);
+ } catch (Exception e) {
+ stmtHandle.close();
+ throw e;
+ }
+ }
+
@Override
public ArrowReader getInfo(int @Nullable [] infoCodes) throws AdbcException {
throw new UnsupportedOperationException();
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
index 9e69698d5..b496049e1 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java
@@ -69,12 +69,13 @@ public class JniStatement implements AdbcStatement {
@Override
public UpdateResult executeUpdate() throws AdbcException {
- throw new UnsupportedOperationException();
+ long rowsAffected = JniLoader.INSTANCE.statementExecuteUpdate(handle);
+ return new UpdateResult(rowsAffected);
}
@Override
public void prepare() throws AdbcException {
- throw new UnsupportedOperationException();
+ JniLoader.INSTANCE.statementPrepare(handle);
}
@Override
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
index 9180c37fd..db8142867 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java
@@ -105,4 +105,17 @@ public enum JniLoader {
NativeAdbc.statementBind(
statement.getStatementHandle(), batch.memoryAddress(),
schema.memoryAddress());
}
+
+ public long statementExecuteUpdate(NativeStatementHandle statement) throws
AdbcException {
+ return NativeAdbc.statementExecuteUpdate(statement.getStatementHandle());
+ }
+
+ public void statementPrepare(NativeStatementHandle statement) throws
AdbcException {
+ NativeAdbc.statementPrepare(statement.getStatementHandle());
+ }
+
+ public void statementSetOption(NativeStatementHandle statement, String key,
String value)
+ throws AdbcException {
+ NativeAdbc.statementSetOption(statement.getStatementHandle(), key, value);
+ }
}
diff --git
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
index e9f652e60..4e839c0b6 100644
---
a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
+++
b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java
@@ -45,4 +45,10 @@ class NativeAdbc {
// isn't a very general interface)
@SuppressWarnings("unused")
static native void statementBindStream(long handle, long stream) throws
AdbcException;
+
+ static native long statementExecuteUpdate(long handle) throws AdbcException;
+
+ static native void statementPrepare(long handle) throws AdbcException;
+
+ static native void statementSetOption(long handle, String key, String value)
throws AdbcException;
}
diff --git
a/java/driver/jni/src/test/java/org/apache/arrow/adbc/driver/jni/JniDriverTest.java
b/java/driver/jni/src/test/java/org/apache/arrow/adbc/driver/jni/JniDriverTest.java
index 98e844b10..94fd9394e 100644
---
a/java/driver/jni/src/test/java/org/apache/arrow/adbc/driver/jni/JniDriverTest.java
+++
b/java/driver/jni/src/test/java/org/apache/arrow/adbc/driver/jni/JniDriverTest.java
@@ -33,6 +33,7 @@ import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.AdbcStatusCode;
+import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
@@ -175,6 +176,7 @@ class JniDriverTest {
}
}
+ @Test
void queryParams() throws Exception {
final Schema paramSchema =
new Schema(Collections.singletonList(Field.nullable("",
Types.MinorType.BIGINT.getType())));
@@ -205,4 +207,105 @@ class JniDriverTest {
}
}
}
+
+ @Test
+ void executeUpdate() throws Exception {
+ try (final BufferAllocator allocator = new RootAllocator()) {
+ JniDriver driver = new JniDriver(allocator);
+ Map<String, Object> parameters = new HashMap<>();
+ JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_sqlite");
+
+ try (final AdbcDatabase db = driver.open(parameters);
+ final AdbcConnection conn = db.connect()) {
+ // Create table
+ try (final AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("CREATE TABLE test_update (id INTEGER, name TEXT)");
+ stmt.executeUpdate();
+ }
+
+ // Insert rows
+ try (final AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("INSERT INTO test_update VALUES (1, 'a'), (2, 'b'),
(3, 'c')");
+ AdbcStatement.UpdateResult result = stmt.executeUpdate();
+ assertThat(result.getAffectedRows()).isEqualTo(3L);
+ }
+
+ // Verify data was inserted
+ try (final AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT COUNT(*) FROM test_update");
+ try (final AdbcStatement.QueryResult result = stmt.executeQuery()) {
+ assertThat(result.getReader().loadNextBatch()).isTrue();
+
assertThat(result.getReader().getVectorSchemaRoot().getVector(0).getObject(0))
+ .isEqualTo(3L);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ void preparedStatement() throws Exception {
+ final Schema paramSchema =
+ new Schema(Collections.singletonList(Field.nullable("",
Types.MinorType.BIGINT.getType())));
+ try (final BufferAllocator allocator = new RootAllocator()) {
+ JniDriver driver = new JniDriver(allocator);
+ Map<String, Object> parameters = new HashMap<>();
+ JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_sqlite");
+
+ try (final AdbcDatabase db = driver.open(parameters);
+ final AdbcConnection conn = db.connect();
+ final AdbcStatement stmt = conn.createStatement();
+ final VectorSchemaRoot root = VectorSchemaRoot.create(paramSchema,
allocator)) {
+ stmt.setSqlQuery("SELECT 1 + ?");
+ stmt.prepare();
+
+ ((BigIntVector) root.getVector(0)).setSafe(0, 41);
+ root.setRowCount(1);
+ stmt.bind(root);
+
+ try (final AdbcStatement.QueryResult result = stmt.executeQuery()) {
+ assertThat(result.getReader().loadNextBatch()).isTrue();
+
assertThat(result.getReader().getVectorSchemaRoot().getVector(0).getObject(0))
+ .isEqualTo(42L);
+ }
+ }
+ }
+ }
+
+ @Test
+ void bulkIngest() throws Exception {
+ final Schema schema =
+ new Schema(
+ Collections.singletonList(Field.nullable("v",
Types.MinorType.BIGINT.getType())));
+ try (final BufferAllocator allocator = new RootAllocator()) {
+ JniDriver driver = new JniDriver(allocator);
+ Map<String, Object> parameters = new HashMap<>();
+ JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_sqlite");
+
+ try (final AdbcDatabase db = driver.open(parameters);
+ final AdbcConnection conn = db.connect()) {
+ // Bulk ingest with CREATE mode
+ try (final AdbcStatement stmt = conn.bulkIngest("bulk_test",
BulkIngestMode.CREATE);
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ ((BigIntVector) root.getVector(0)).setSafe(0, 1);
+ ((BigIntVector) root.getVector(0)).setSafe(1, 2);
+ ((BigIntVector) root.getVector(0)).setSafe(2, 3);
+ root.setRowCount(3);
+
+ stmt.bind(root);
+ AdbcStatement.UpdateResult result = stmt.executeUpdate();
+ assertThat(result.getAffectedRows()).isEqualTo(3L);
+ }
+
+ // Verify data
+ try (final AdbcStatement stmt = conn.createStatement()) {
+ stmt.setSqlQuery("SELECT * FROM bulk_test ORDER BY v ASC");
+ try (final AdbcStatement.QueryResult result = stmt.executeQuery()) {
+ assertThat(result.getReader().loadNextBatch()).isTrue();
+
assertThat(result.getReader().getVectorSchemaRoot().getRowCount()).isEqualTo(3);
+ }
+ }
+ }
+ }
+ }
}