This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 6fa8cd5 Refactor Java APIs for clarity (#39)
6fa8cd5 is described below
commit 6fa8cd54341b073c956634d707ed22727d7d7581
Author: David Li <[email protected]>
AuthorDate: Fri Jul 15 13:56:39 2022 -0400
Refactor Java APIs for clarity (#39)
---
.../org/apache/arrow/adbc/core/AdbcConnection.java | 12 ++--
.../org/apache/arrow/adbc/core/AdbcDatabase.java | 8 ++-
.../org/apache/arrow/adbc/core/AdbcDriver.java | 8 ++-
.../org/apache/arrow/adbc/core/AdbcStatement.java | 15 ++++-
.../{AdbcDatabase.java => BulkIngestMode.java} | 14 ++--
.../core/{AdbcDriver.java => package-info.java} | 11 ++--
.../adbc/drivermanager/AdbcDriverManager.java | 2 +-
.../arrow/adbc/driver/jdbc/JdbcConnection.java | 6 +-
.../apache/arrow/adbc/driver/jdbc/JdbcDriver.java | 2 +-
.../arrow/adbc/driver/jdbc/JdbcStatement.java | 74 ++++++++++++++--------
.../driver/jdbc/JdbcConnectionMetadataTest.java | 2 +-
.../arrow/adbc/driver/jdbc/JdbcConnectionTest.java | 2 +-
.../arrow/adbc/driver/jdbc/JdbcStatementTest.java | 2 +-
.../adbc/driver/jdbc/JdbcTransactionTest.java | 2 +-
.../testsuite/AbstractConnectionMetadataTest.java | 5 +-
.../driver/testsuite/AbstractStatementTest.java | 68 +++++++++++++++++---
.../driver/testsuite/AbstractTransactionTest.java | 7 +-
17 files changed, 170 insertions(+), 70 deletions(-)
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
index 2a1e2b8..5f11a88 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
@@ -38,13 +38,13 @@ public interface AdbcConnection extends AutoCloseable {
/**
* Create a new statement to bulk insert a {@link VectorSchemaRoot} into a
table.
*
- * <p>Bind data to the statement, then call {@link AdbcStatement#execute()}.
The table will be
- * created if it does not exist. Otherwise data will be appended.
<tt>execute()</tt> will throw
- * AdbcException with status {@link AdbcStatusCode#ALREADY_EXISTS} if the
schema of the bound data
- * does not match the table schema.
+ * <p>Bind data to the statement, then call {@link AdbcStatement#execute()}.
See {@link
+ * BulkIngestMode} for description of behavior around creating tables.
*/
- default AdbcStatement bulkIngest(String targetTableName) throws
AdbcException {
- throw new UnsupportedOperationException("Connection does not support
bulkIngest(String)");
+ default AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
+ throws AdbcException {
+ throw new UnsupportedOperationException(
+ "Connection does not support bulkIngest(String, BulkIngestMode)");
}
/**
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDatabase.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDatabase.java
index ce34aeb..e63c598 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDatabase.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDatabase.java
@@ -17,7 +17,13 @@
package org.apache.arrow.adbc.core;
-/** An instance of a database (e.g. a handle to an in-memory database). */
+/**
+ * An instance of a database (e.g. a handle to an in-memory database).
+ *
+ * <p>A database can have multiple open connections. While this is likely
redundant structure for
+ * remote/networked databases, for in-memory databases, this object provides
an explicit point of
+ * ownership.
+ */
public interface AdbcDatabase extends AutoCloseable {
/** Create a new connection to the database. */
AdbcConnection connect() throws AdbcException;
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
index 619353c..fc3f5e4 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
@@ -19,6 +19,12 @@ package org.apache.arrow.adbc.core;
import java.util.Map;
+/** A handle to an ADBC database driver. */
public interface AdbcDriver {
- AdbcDatabase connect(Map<String, String> parameters) throws AdbcException;
+ /**
+ * Open a database via this driver.
+ *
+ * @param parameters Driver-specific parameters.
+ */
+ AdbcDatabase open(Map<String, String> parameters) throws AdbcException;
}
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
index 6b464e9..7b62ef0 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java
@@ -25,7 +25,7 @@ import org.apache.arrow.vector.ipc.ArrowReader;
public interface AdbcStatement extends AutoCloseable {
/** Set a generic query option. */
- default void setOption(String key, String value) {
+ default void setOption(String key, Object value) {
throw new UnsupportedOperationException("Unsupported option " + key);
}
@@ -52,9 +52,19 @@ public interface AdbcStatement extends AutoCloseable {
throw new UnsupportedOperationException("Statement does not support bind");
}
- /** Execute the query. */
+ /**
+ * Execute the query.
+ *
+ * <p>Usually you will want to use {@link #executeQuery()}.
+ */
void execute() throws AdbcException;
+ /** Execute a result set-generating query and get the result. */
+ default ArrowReader executeQuery() throws AdbcException {
+ execute();
+ return getArrowReader();
+ }
+
/**
* Get the result of executing a query.
*
@@ -75,7 +85,6 @@ public interface AdbcStatement extends AutoCloseable {
* @return The list of descriptors, or an empty list if unsupported.
*/
default List<PartitionDescriptor> getPartitionDescriptors() {
- // TODO: throw UnsupportedOperationException instead?
return Collections.emptyList();
}
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDatabase.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/BulkIngestMode.java
similarity index 65%
copy from java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDatabase.java
copy to java/core/src/main/java/org/apache/arrow/adbc/core/BulkIngestMode.java
index ce34aeb..2ab16ac 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDatabase.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/BulkIngestMode.java
@@ -17,8 +17,14 @@
package org.apache.arrow.adbc.core;
-/** An instance of a database (e.g. a handle to an in-memory database). */
-public interface AdbcDatabase extends AutoCloseable {
- /** Create a new connection to the database. */
- AdbcConnection connect() throws AdbcException;
+/** How to handle already-existing/nonexistent tables for bulk ingest
operations. */
+public enum BulkIngestMode {
+ /** Create the table and insert data; error if the table exists. */
+ CREATE,
+ /**
+ * Do not create the table and append data; error if the table does not
exist ({@link
+ * AdbcStatusCode#NOT_FOUND}) or does not match the schema of the data to
append ({@link
+ * AdbcStatusCode#ALREADY_EXISTS}). *
+ */
+ APPEND,
}
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/package-info.java
similarity index 85%
copy from java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
copy to java/core/src/main/java/org/apache/arrow/adbc/core/package-info.java
index 619353c..d6c8df3 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/package-info.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
+/**
+ * ADBC (Arrow Database Connectivity) is an API standard for Arrow-based
database access.
+ *
+ * <p>ADBC is currently experimental.
+ */
package org.apache.arrow.adbc.core;
-
-import java.util.Map;
-
-public interface AdbcDriver {
- AdbcDatabase connect(Map<String, String> parameters) throws AdbcException;
-}
diff --git
a/java/driver-manager/src/main/java/org/apache/arrow/adbc/drivermanager/AdbcDriverManager.java
b/java/driver-manager/src/main/java/org/apache/arrow/adbc/drivermanager/AdbcDriverManager.java
index 0c8c8f6..b197bf5 100644
---
a/java/driver-manager/src/main/java/org/apache/arrow/adbc/drivermanager/AdbcDriverManager.java
+++
b/java/driver-manager/src/main/java/org/apache/arrow/adbc/drivermanager/AdbcDriverManager.java
@@ -49,7 +49,7 @@ public final class AdbcDriverManager {
throw new AdbcException(
"Driver not found for '" + driverName + "'", null,
AdbcStatusCode.NOT_FOUND, null, 0);
}
- return driver.connect(parameters);
+ return driver.open(parameters);
}
/**
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
index 9559c30..0bc8fa3 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
+++
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
@@ -27,6 +27,7 @@ import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.core.StandardSchemas;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -60,8 +61,9 @@ public class JdbcConnection implements AdbcConnection {
}
@Override
- public AdbcStatement bulkIngest(String targetTableName) throws AdbcException
{
- return JdbcStatement.ingestRoot(allocator, connection, targetTableName);
+ public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
+ throws AdbcException {
+ return JdbcStatement.ingestRoot(allocator, connection, targetTableName,
mode);
}
@Override
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
index 4a88503..b3621ac 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
+++
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
@@ -35,7 +35,7 @@ public enum JdbcDriver implements AdbcDriver {
}
@Override
- public AdbcDatabase connect(Map<String, String> parameters) throws
AdbcException {
+ public AdbcDatabase open(Map<String, String> parameters) throws
AdbcException {
return new JdbcDatabase(allocator, "jdbc:derby:" + parameters.get("path"));
}
}
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
index bf5c0db..99b46ac 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
+++
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
@@ -18,15 +18,17 @@
package org.apache.arrow.adbc.driver.jdbc;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.AdbcStatusCode;
+import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.driver.jdbc.util.JdbcParameterBinder;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
@@ -35,6 +37,16 @@ import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Field;
public class JdbcStatement implements AdbcStatement {
+ // Do our best to properly map database-specific errors to NOT_FOUND status.
+ private static final List<String> SQLSTATE_TABLE_NOT_FOUND =
+ Arrays.asList(
+ // Apache Derby
https://db.apache.org/derby/docs/10.4/ref/rrefexcept71493.html
+ "42X05",
+ // MySQL
+ //
https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-error-sqlstates.html
+ "42S02",
+ // Postgres
https://www.postgresql.org/docs/current/errcodes-appendix.html
+ "42P01");
private final BufferAllocator allocator;
private final Connection connection;
@@ -43,7 +55,7 @@ public class JdbcStatement implements AdbcStatement {
private String sqlQuery;
private ResultSet resultSet;
// State for bulk ingest
- private String bulkTargetTable;
+ private BulkState bulkOperation;
private VectorSchemaRoot bindRoot;
JdbcStatement(BufferAllocator allocator, Connection connection) {
@@ -52,17 +64,24 @@ public class JdbcStatement implements AdbcStatement {
this.sqlQuery = null;
}
- static AdbcStatement ingestRoot(
- BufferAllocator allocator, Connection connection, String
targetTableName) {
+ static JdbcStatement ingestRoot(
+ BufferAllocator allocator,
+ Connection connection,
+ String targetTableName,
+ BulkIngestMode mode) {
Objects.requireNonNull(targetTableName);
final JdbcStatement statement = new JdbcStatement(allocator, connection);
- statement.bulkTargetTable = targetTableName;
+ statement.bulkOperation = new BulkState();
+ statement.bulkOperation.mode = mode;
+ statement.bulkOperation.targetTable = targetTableName;
return statement;
}
@Override
public void setSqlQuery(String query) {
- bulkTargetTable = null;
+ if (bulkOperation != null) {
+ throw new IllegalStateException("Statement is configured for a bulk
ingest/append operation");
+ }
sqlQuery = query;
}
@@ -73,7 +92,7 @@ public class JdbcStatement implements AdbcStatement {
@Override
public void execute() throws AdbcException {
- if (bulkTargetTable != null) {
+ if (bulkOperation != null) {
executeBulk();
} else if (sqlQuery != null) {
executeSqlQuery();
@@ -84,7 +103,7 @@ public class JdbcStatement implements AdbcStatement {
private void createBulkTable() throws AdbcException {
final StringBuilder create = new StringBuilder("CREATE TABLE ");
- create.append(bulkTargetTable);
+ create.append(bulkOperation.targetTable);
create.append(" (");
for (int col = 0; col < bindRoot.getFieldVectors().size(); col++) {
if (col > 0) {
@@ -130,7 +149,8 @@ public class JdbcStatement implements AdbcStatement {
try (final Statement statement = connection.createStatement()) {
statement.execute(create.toString());
} catch (SQLException e) {
- throw JdbcDriverUtil.fromSqlException(e);
+ throw JdbcDriverUtil.fromSqlException(
+ AdbcStatusCode.ALREADY_EXISTS, "Could not create table %s", e,
bulkOperation.targetTable);
}
}
@@ -139,25 +159,14 @@ public class JdbcStatement implements AdbcStatement {
throw new IllegalStateException("Must bind() before bulk insert");
}
- // Check if table exists, create it if necessary.
- // XXX: TOC/TOU fallacy.
- try {
- final DatabaseMetaData dbmd = connection.getMetaData();
- try (final ResultSet rs =
- dbmd.getTables(/*catalog*/ null, /*schema*/ null, bulkTargetTable,
/*types*/ null)) {
- if (!rs.next()) {
- createBulkTable();
- }
- }
- } catch (SQLException e) {
- throw JdbcDriverUtil.fromSqlException(
- "Could not determine if table %s exists: ", e, bulkTargetTable);
+ if (bulkOperation.mode == BulkIngestMode.CREATE) {
+ createBulkTable();
}
// XXX: potential injection
// TODO: consider (optionally?) depending on jOOQ to generate SQL and
support different dialects
final StringBuilder insert = new StringBuilder("INSERT INTO ");
- insert.append(bulkTargetTable);
+ insert.append(bulkOperation.targetTable);
insert.append(" VALUES (");
for (int col = 0; col < bindRoot.getFieldVectors().size(); col++) {
if (col > 0) {
@@ -171,11 +180,17 @@ public class JdbcStatement implements AdbcStatement {
try {
statement = connection.prepareStatement(insert.toString());
} catch (SQLException e) {
+ // It's hard to differentiate between 'table not found' and parameter
type/count mismatch here
+ // because SQLState is inconsistent (see SQLSTATE_TABLE_NOT_FOUND
above). We could query for
+ // table existence but that's another roundtrip and leads to a TOC/TOU
+ // error. Instead, we hard-code some common codes here.
+
+ final AdbcStatusCode code =
+ SQLSTATE_TABLE_NOT_FOUND.contains(e.getSQLState())
+ ? AdbcStatusCode.NOT_FOUND
+ : AdbcStatusCode.ALREADY_EXISTS;
throw JdbcDriverUtil.fromSqlException(
- AdbcStatusCode.ALREADY_EXISTS,
- "Could not bulk insert into table %s: ",
- e,
- bulkTargetTable);
+ code, "Could not bulk insert into table %s: ", e,
bulkOperation.targetTable);
}
try {
try {
@@ -227,4 +242,9 @@ public class JdbcStatement implements AdbcStatement {
public void close() throws Exception {
AutoCloseables.close(resultSet, statement);
}
+
+ private static final class BulkState {
+ public BulkIngestMode mode;
+ String targetTable;
+ }
}
diff --git
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
index 024223d..9d8dca4 100644
---
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
+++
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
@@ -31,6 +31,6 @@ public class JdbcConnectionMetadataTest extends
AbstractConnectionMetadataTest {
protected AdbcDatabase init() throws AdbcException {
final Map<String, String> parameters = new HashMap<>();
parameters.put("path", tempDir.toString() + "/db;create=true");
- return JdbcDriver.INSTANCE.connect(parameters);
+ return JdbcDriver.INSTANCE.open(parameters);
}
}
diff --git
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionTest.java
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionTest.java
index bbfe424..4fcf69e 100644
---
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionTest.java
+++
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionTest.java
@@ -31,6 +31,6 @@ public class JdbcConnectionTest extends
AbstractConnectionTest {
protected AdbcDatabase init() throws AdbcException {
final Map<String, String> parameters = new HashMap<>();
parameters.put("path", tempDir.toString() + "/db;create=true");
- return JdbcDriver.INSTANCE.connect(parameters);
+ return JdbcDriver.INSTANCE.open(parameters);
}
}
diff --git
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatementTest.java
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatementTest.java
index d81347d..72a28f3 100644
---
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatementTest.java
+++
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatementTest.java
@@ -32,6 +32,6 @@ class JdbcStatementTest extends AbstractStatementTest {
protected AdbcDatabase init() throws AdbcException {
final Map<String, String> parameters = new HashMap<>();
parameters.put("path", tempDir.toString() + "/db;create=true");
- return JdbcDriver.INSTANCE.connect(parameters);
+ return JdbcDriver.INSTANCE.open(parameters);
}
}
diff --git
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcTransactionTest.java
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcTransactionTest.java
index 622f5e8..e5bcaeb 100644
---
a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcTransactionTest.java
+++
b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcTransactionTest.java
@@ -31,6 +31,6 @@ public class JdbcTransactionTest extends
AbstractTransactionTest {
protected AdbcDatabase init() throws AdbcException {
final Map<String, String> parameters = new HashMap<>();
parameters.put("path", tempDir.toString() + "/db;create=true");
- return JdbcDriver.INSTANCE.connect(parameters);
+ return JdbcDriver.INSTANCE.open(parameters);
}
}
diff --git
a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
index bba9aac..0ded7d9 100644
---
a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
+++
b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
@@ -29,6 +29,7 @@ import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.core.StandardSchemas;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -168,7 +169,7 @@ public abstract class AbstractConnectionMetadataTest {
Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/
true)),
Field.nullable("STRS", new ArrowType.Utf8())));
try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
- try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
}
@@ -220,7 +221,7 @@ public abstract class AbstractConnectionMetadataTest {
// TODO: XXX: need a "quirks" system to handle idiosyncracies. For
example: Derby forces table
// names to uppercase, but does not do case folding in all places.
- try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
}
diff --git
a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
index f1870af..a47a339 100644
---
a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
+++
b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
@@ -29,6 +29,7 @@ import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.AdbcStatusCode;
+import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
@@ -63,7 +64,7 @@ public abstract class AbstractStatementTest {
}
@Test
- public void bulkInsert() throws Exception {
+ public void bulkInsertAppend() throws Exception {
final Schema schema =
new Schema(
Arrays.asList(
@@ -87,28 +88,26 @@ public abstract class AbstractStatementTest {
// TODO: XXX: need a "quirks" system to handle idiosyncracies. For
example: Derby forces table
// names to uppercase, but does not do case folding in all places.
- try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
}
try (final AdbcStatement stmt = connection.createStatement()) {
stmt.setSqlQuery("SELECT * FROM foo");
- stmt.execute();
- try (ArrowReader arrowReader = stmt.getArrowReader()) {
+ try (ArrowReader arrowReader = stmt.executeQuery()) {
assertThat(arrowReader.loadNextBatch()).isTrue();
assertRoot(arrowReader.getVectorSchemaRoot()).isEqualTo(root);
}
}
// Append
- try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.APPEND)) {
stmt.bind(root);
stmt.execute();
}
try (final AdbcStatement stmt = connection.createStatement()) {
stmt.setSqlQuery("SELECT * FROM FOO");
- stmt.execute();
- try (ArrowReader arrowReader = stmt.getArrowReader()) {
+ try (ArrowReader arrowReader = stmt.executeQuery()) {
assertThat(arrowReader.loadNextBatch()).isTrue();
root.setRowCount(8);
ints.setSafe(4, 0);
@@ -123,14 +122,65 @@ public abstract class AbstractStatementTest {
}
}
}
+ }
- // Conflict
+ @Test
+ public void bulkIngestAppendConflict() throws Exception {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/
true)),
+ Field.nullable("STRS", new ArrowType.Utf8())));
final Schema schema2 =
new Schema(
Collections.singletonList(
Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/
true))));
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.CREATE)) {
+ stmt.bind(root);
+ stmt.execute();
+ }
+ }
try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema2,
allocator)) {
- try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.APPEND)) {
+ stmt.bind(root);
+ final AdbcException e = assertThrows(AdbcException.class,
stmt::execute);
+ assertThat(e.getStatus()).isEqualTo(AdbcStatusCode.ALREADY_EXISTS);
+ }
+ }
+ }
+
+ @Test
+ public void bulkIngestAppendNotFound() throws Exception {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/
true)),
+ Field.nullable("STRS", new ArrowType.Utf8())));
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.APPEND)) {
+ stmt.bind(root);
+ final AdbcException e = assertThrows(AdbcException.class,
stmt::execute);
+ assertThat(e.getStatus()).isEqualTo(AdbcStatusCode.NOT_FOUND);
+ }
+ }
+ }
+
+ @Test
+ public void bulkIngestCreateConflict() throws Exception {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/
true)),
+ Field.nullable("STRS", new ArrowType.Utf8())));
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.CREATE)) {
+ stmt.bind(root);
+ stmt.execute();
+ }
+ }
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO",
BulkIngestMode.CREATE)) {
stmt.bind(root);
final AdbcException e = assertThrows(AdbcException.class,
stmt::execute);
assertThat(e.getStatus()).isEqualTo(AdbcStatusCode.ALREADY_EXISTS);
diff --git
a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java
b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java
index 96f0247..cc2908e 100644
---
a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java
+++
b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractTransactionTest.java
@@ -25,6 +25,7 @@ import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
@@ -87,7 +88,7 @@ public abstract class AbstractTransactionTest {
ints.setSafe(0, 1);
ints.setSafe(1, 2);
root.setRowCount(2);
- try (final AdbcStatement stmt = connection.bulkIngest("foo")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("foo",
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
}
@@ -116,7 +117,7 @@ public abstract class AbstractTransactionTest {
ints.setSafe(0, 1);
ints.setSafe(1, 2);
root.setRowCount(2);
- try (final AdbcStatement stmt = connection.bulkIngest("foo")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("foo",
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
}
@@ -146,7 +147,7 @@ public abstract class AbstractTransactionTest {
ints.setSafe(0, 1);
ints.setSafe(1, 2);
root.setRowCount(2);
- try (final AdbcStatement stmt = connection.bulkIngest("foo")) {
+ try (final AdbcStatement stmt = connection.bulkIngest("foo",
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
}