This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new ef70af6 [Java] Add basic Flight SQL driver (#47)
ef70af6 is described below
commit ef70af6aff8da61bbe2f19f14c607aa6c07caf33
Author: David Li <[email protected]>
AuthorDate: Tue Aug 2 15:33:30 2022 -0400
[Java] Add basic Flight SQL driver (#47)
* [Java] Add basic Flight SQL driver
* [Java] Add partition descriptor support
* [Java] Properly handle FlightEndpoint.locations
* [Java] Refactor handling of FlightEndpoint
* [Java] Consolidate catches
---
.../org/apache/arrow/adbc/core/AdbcDriver.java | 5 +
.../org/apache/arrow/adbc/core/AdbcException.java | 12 +
.../arrow/adbc/core/PartitionDescriptor.java | 17 +-
.../driver/{jdbc => flight-sql-validation}/pom.xml | 41 +--
.../flightsql/FlightSqlConnectionMetadataTest.java | 61 ++++
.../driver/flightsql/FlightSqlConnectionTest.java} | 18 +-
.../FlightSqlPartitionDescriptorTest.java} | 18 +-
.../adbc/driver/flightsql/FlightSqlQuirks.java} | 46 +--
.../driver/flightsql/FlightSqlStatementTest.java} | 23 +-
java/driver/{jdbc => flight-sql}/pom.xml | 62 ++--
.../adbc/driver/flightsql/FixedRootStatement.java | 66 +++++
.../adbc/driver/flightsql/FlightInfoReader.java | 158 ++++++++++
.../adbc/driver/flightsql/FlightSqlConnection.java | 141 +++++++++
.../adbc/driver/flightsql/FlightSqlDatabase.java} | 43 +--
.../adbc/driver/flightsql/FlightSqlDriver.java} | 37 ++-
.../adbc/driver/flightsql/FlightSqlDriverUtil.java | 78 +++++
.../adbc/driver/flightsql/FlightSqlStatement.java | 319 +++++++++++++++++++++
.../adbc/driver/flightsql/InfoMetadataBuilder.java | 156 ++++++++++
.../adbc/driver/flightsql/RootArrowReader.java | 68 +++++
.../arrow/adbc/driver/jdbc/derby/DerbyQuirks.java | 3 +-
.../driver/jdbc/postgresql/PostgresqlQuirks.java | 12 +-
java/driver/jdbc/pom.xml | 30 +-
.../arrow/adbc/driver/jdbc/JdbcConnection.java | 5 +-
.../arrow/adbc/driver/jdbc/JdbcDatabase.java | 5 +-
.../apache/arrow/adbc/driver/jdbc/JdbcDriver.java | 18 +-
.../arrow/adbc/driver/jdbc/JdbcStatement.java | 17 +-
.../testsuite/AbstractConnectionMetadataTest.java | 16 +-
.../testsuite/AbstractPartitionDescriptorTest.java | 122 ++++++++
.../driver/testsuite/AbstractStatementTest.java | 11 +-
java/pom.xml | 24 ++
java/{driver/jdbc => sql}/pom.xml | 49 +---
.../java/org/apache/arrow/adbc/sql/SqlQuirks.java} | 12 +-
.../org/apache/arrow/adbc/sql/package-info.java} | 19 +-
33 files changed, 1417 insertions(+), 295 deletions(-)
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
index 8b71f95..80abd18 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
@@ -21,6 +21,11 @@ import java.util.Map;
/** A handle to an ADBC database driver. */
public interface AdbcDriver {
+ /** The standard parameter name for a connection URL (type String). */
+ String PARAM_URL = "adbc.url";
+ /** The standard parameter name for SQL quirks configuration (type
SqlQuirks). */
+ String PARAM_SQL_QUIRKS = "adbc.sql.quirks";
+
/**
* Open a database via this driver.
*
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
index 66de077..c865f2e 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java
@@ -48,6 +48,11 @@ public class AdbcException extends Exception {
return new AdbcException(message, /*cause*/ null,
AdbcStatusCode.INVALID_ARGUMENT, null, 0);
}
+ /** Create a new exception with code {@link AdbcStatusCode#IO}. */
+ public static AdbcException io(String message) {
+ return new AdbcException(message, /*cause*/ null, AdbcStatusCode.IO, null,
0);
+ }
+
/** Create a new exception with code {@link AdbcStatusCode#INVALID_STATE}. */
public static AdbcException invalidState(String message) {
return new AdbcException(message, /*cause*/ null,
AdbcStatusCode.INVALID_STATE, null, 0);
@@ -70,6 +75,13 @@ public class AdbcException extends Exception {
return vendorCode;
}
+ /**
+ * Copy this exception with a different cause (a convenience for use with
the static factories).
+ */
+ public AdbcException withCause(Throwable cause) {
+ return new AdbcException(this.getMessage(), cause, status, sqlState,
vendorCode);
+ }
+
@Override
public String toString() {
return "AdbcException{"
diff --git
a/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java
b/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java
index b625bf6..3f20478 100644
---
a/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java
+++
b/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java
@@ -21,11 +21,9 @@ import java.util.Objects;
/** An opaque descriptor for a part of a potentially distributed or
partitioned result set. */
public final class PartitionDescriptor {
- private final String friendlyName;
private final ByteBuffer descriptor;
- public PartitionDescriptor(final String friendlyName, final ByteBuffer
descriptor) {
- this.friendlyName = friendlyName;
+ public PartitionDescriptor(final ByteBuffer descriptor) {
this.descriptor = Objects.requireNonNull(descriptor);
}
@@ -42,23 +40,16 @@ public final class PartitionDescriptor {
return false;
}
PartitionDescriptor that = (PartitionDescriptor) o;
- return Objects.equals(friendlyName, that.friendlyName)
- && getDescriptor().equals(that.getDescriptor());
+ return descriptor.equals(that.descriptor);
}
@Override
public int hashCode() {
- return Objects.hash(friendlyName, getDescriptor());
+ return Objects.hash(descriptor);
}
@Override
public String toString() {
- return "PartitionDescriptor{"
- + "friendlyName='"
- + friendlyName
- + '\''
- + ", descriptor="
- + descriptor
- + '}';
+ return "PartitionDescriptor{" + "descriptor=" + descriptor + '}';
}
}
diff --git a/java/driver/jdbc/pom.xml
b/java/driver/flight-sql-validation/pom.xml
similarity index 64%
copy from java/driver/jdbc/pom.xml
copy to java/driver/flight-sql-validation/pom.xml
index 0ea8d43..3ad44e2 100644
--- a/java/driver/jdbc/pom.xml
+++ b/java/driver/flight-sql-validation/pom.xml
@@ -18,51 +18,20 @@
<relativePath>../../pom.xml</relativePath>
</parent>
- <artifactId>adbc-driver-jdbc</artifactId>
+ <artifactId>adbc-driver-flight-sql-validation</artifactId>
<packaging>jar</packaging>
- <name>Arrow ADBC Driver JDBC</name>
- <description>An ADBC driver wrapping the JDBC API.</description>
+ <name>Arrow ADBC Driver Flight SQL Validation</name>
+ <description>Tests validating the Flight SQL driver.</description>
<dependencies>
- <!-- Arrow -->
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-jdbc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-memory-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-vector</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-jdbc-util</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-manager</artifactId>
- </dependency>
-
- <!-- Derby -->
- <!-- Cannot upgrade beyond this version for Java 8 support -->
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.14.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derbytools</artifactId>
- <version>10.14.2.0</version>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-driver-flight-sql</artifactId>
<scope>test</scope>
</dependency>
diff --git
a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java
new file mode 100644
index 0000000..da56280
--- /dev/null
+++
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.flightsql;
+
+import org.apache.arrow.adbc.driver.testsuite.AbstractConnectionMetadataTest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+
+public class FlightSqlConnectionMetadataTest extends
AbstractConnectionMetadataTest {
+ @BeforeAll
+ public static void beforeAll() {
+ quirks = new FlightSqlQuirks();
+ }
+
+ @Override
+ @Disabled("Not yet implemented")
+ public void getObjectsColumns() throws Exception {}
+
+ @Override
+ @Disabled("Not yet implemented")
+ public void getObjectsCatalogs() throws Exception {}
+
+ @Override
+ @Disabled("Not yet implemented")
+ public void getObjectsDbSchemas() throws Exception {
+ super.getObjectsDbSchemas();
+ }
+
+ @Override
+ @Disabled("Not yet implemented")
+ public void getObjectsTables() throws Exception {
+ super.getObjectsTables();
+ }
+
+ @Override
+ @Disabled("Not yet implemented")
+ public void getTableSchema() throws Exception {
+ super.getTableSchema();
+ }
+
+ @Override
+ @Disabled("Not yet implemented")
+ public void getTableTypes() throws Exception {
+ super.getTableTypes();
+ }
+}
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionTest.java
similarity index 71%
copy from java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
copy to
java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionTest.java
index 8b71f95..6137d4e 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
+++
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionTest.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.arrow.adbc.core;
+package org.apache.arrow.adbc.driver.flightsql;
-import java.util.Map;
+import org.apache.arrow.adbc.driver.testsuite.AbstractConnectionTest;
+import org.junit.jupiter.api.BeforeAll;
-/** A handle to an ADBC database driver. */
-public interface AdbcDriver {
- /**
- * Open a database via this driver.
- *
- * @param parameters Driver-specific parameters.
- */
- AdbcDatabase open(Map<String, Object> parameters) throws AdbcException;
+public class FlightSqlConnectionTest extends AbstractConnectionTest {
+ @BeforeAll
+ public static void beforeAll() {
+ quirks = new FlightSqlQuirks();
+ }
}
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlPartitionDescriptorTest.java
similarity index 70%
copy from java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
copy to
java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlPartitionDescriptorTest.java
index 8b71f95..3bd2969 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
+++
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlPartitionDescriptorTest.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.arrow.adbc.core;
+package org.apache.arrow.adbc.driver.flightsql;
-import java.util.Map;
+import org.apache.arrow.adbc.driver.testsuite.AbstractPartitionDescriptorTest;
+import org.junit.jupiter.api.BeforeAll;
-/** A handle to an ADBC database driver. */
-public interface AdbcDriver {
- /**
- * Open a database via this driver.
- *
- * @param parameters Driver-specific parameters.
- */
- AdbcDatabase open(Map<String, Object> parameters) throws AdbcException;
+class FlightSqlPartitionDescriptorTest extends AbstractPartitionDescriptorTest
{
+ @BeforeAll
+ public static void beforeAll() {
+ quirks = new FlightSqlQuirks();
+ }
}
diff --git
a/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlQuirks.java
similarity index 52%
copy from
java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java
copy to
java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlQuirks.java
index 877f512..d2cd614 100644
---
a/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java
+++
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlQuirks.java
@@ -15,41 +15,51 @@
* limitations under the License.
*/
-package org.apache.arrow.adbc.driver.jdbc.derby;
+package org.apache.arrow.adbc.driver.flightsql;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.adbc.core.AdbcException;
-import org.apache.arrow.adbc.driver.jdbc.JdbcDriver;
import org.apache.arrow.adbc.driver.testsuite.SqlValidationQuirks;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.Assumptions;
-public class DerbyQuirks extends SqlValidationQuirks {
- private final String jdbcUrl;
+public class FlightSqlQuirks extends SqlValidationQuirks {
+ static final String FLIGHT_SQL_LOCATION_ENV_VAR = "ADBC_FLIGHT_SQL_LOCATION";
- public DerbyQuirks(Path databaseRoot) {
- this.jdbcUrl = "jdbc:derby:" + databaseRoot.toString() + "/db;create=true";
+ static String getFlightLocation() {
+ final String location = System.getenv(FLIGHT_SQL_LOCATION_ENV_VAR);
+ Assumptions.assumeFalse(
+ location == null || location.isEmpty(),
+ "Flight SQL server not found, set " + FLIGHT_SQL_LOCATION_ENV_VAR);
+ return location;
}
@Override
public AdbcDatabase initDatabase() throws AdbcException {
+ String url = getFlightLocation();
+
final Map<String, Object> parameters = new HashMap<>();
- parameters.put("adbc.jdbc.url", jdbcUrl);
- return JdbcDriver.INSTANCE.open(parameters);
+ parameters.put(AdbcDriver.PARAM_URL, url);
+ return FlightSqlDriver.INSTANCE.open(parameters);
}
@Override
public void cleanupTable(String name) throws Exception {
- try (final Connection connection1 = DriverManager.getConnection(jdbcUrl)) {
- try (Statement statement = connection1.createStatement()) {
- statement.execute("DROP TABLE " + name);
- } catch (SQLException ignored) {
- }
+ try (final BufferAllocator allocator = new RootAllocator();
+ final FlightSqlClient client =
+ new FlightSqlClient(
+ FlightClient.builder(allocator, new
Location(getFlightLocation())).build())) {
+ client.executeUpdate("DROP TABLE " + name);
+ } catch (FlightRuntimeException e) {
+ // Ignored
}
}
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatementTest.java
similarity index 64%
copy from java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
copy to
java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatementTest.java
index 8b71f95..306f69e 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
+++
b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatementTest.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.arrow.adbc.core;
+package org.apache.arrow.adbc.driver.flightsql;
-import java.util.Map;
+import org.apache.arrow.adbc.driver.testsuite.AbstractStatementTest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
-/** A handle to an ADBC database driver. */
-public interface AdbcDriver {
- /**
- * Open a database via this driver.
- *
- * @param parameters Driver-specific parameters.
- */
- AdbcDatabase open(Map<String, Object> parameters) throws AdbcException;
+class FlightSqlStatementTest extends AbstractStatementTest {
+ @BeforeAll
+ public static void beforeAll() {
+ quirks = new FlightSqlQuirks();
+ }
+
+ @Override
+ @Disabled("Requires spec clarification")
+ public void prepareQueryWithParameters() {}
}
diff --git a/java/driver/jdbc/pom.xml b/java/driver/flight-sql/pom.xml
similarity index 66%
copy from java/driver/jdbc/pom.xml
copy to java/driver/flight-sql/pom.xml
index 0ea8d43..02033c7 100644
--- a/java/driver/jdbc/pom.xml
+++ b/java/driver/flight-sql/pom.xml
@@ -18,17 +18,25 @@
<relativePath>../../pom.xml</relativePath>
</parent>
- <artifactId>adbc-driver-jdbc</artifactId>
+ <artifactId>adbc-driver-flight-sql</artifactId>
<packaging>jar</packaging>
- <name>Arrow ADBC Driver JDBC</name>
- <description>An ADBC driver wrapping the JDBC API.</description>
+ <name>Arrow ADBC Driver Flight SQL</name>
+ <description>An ADBC driver wrapping Flight SQL.</description>
<dependencies>
- <!-- Arrow -->
<dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-jdbc</artifactId>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <!-- Latest version still supporting Java 8 -->
+ <version>2.9.3</version>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.21.3</version>
+ </dependency>
+
+ <!-- Arrow -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
@@ -37,50 +45,26 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
-
<dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-jdbc-util</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-manager</artifactId>
- </dependency>
-
- <!-- Derby -->
- <!-- Cannot upgrade beyond this version for Java 8 support -->
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.14.2.0</version>
- <scope>test</scope>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-core</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derbytools</artifactId>
- <version>10.14.2.0</version>
- <scope>test</scope>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql</artifactId>
</dependency>
- <!-- Testing -->
<dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-core</artifactId>
</dependency>
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-driver-manager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-validation</artifactId>
- <scope>test</scope>
+ <artifactId>adbc-sql</artifactId>
</dependency>
</dependencies>
</project>
diff --git
a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FixedRootStatement.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FixedRootStatement.java
new file mode 100644
index 0000000..a7a6787
--- /dev/null
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FixedRootStatement.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.flightsql;
+
+import java.util.Collections;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** An AdbcStatement implementation that returns fixed data from a root. */
+class FixedRootStatement implements AdbcStatement {
+ private final BufferAllocator allocator;
+ private final Schema overrideSchema;
+ private ArrowRecordBatch recordBatch;
+
+ public FixedRootStatement(BufferAllocator allocator, VectorSchemaRoot root) {
+ this.allocator = allocator;
+ this.overrideSchema = root.getSchema();
+ // Unload the root to preserve the data
+ recordBatch = new VectorUnloader(root).getRecordBatch();
+ }
+
+ @Override
+ public void execute() throws AdbcException {
+ throw AdbcException.invalidState("[Flight SQL] Cannot execute() this
statement");
+ }
+
+ @Override
+ public ArrowReader getArrowReader() throws AdbcException {
+ final ArrowReader reader =
+ new RootArrowReader(allocator, overrideSchema,
Collections.singletonList(recordBatch));
+ recordBatch = null;
+ return reader;
+ }
+
+ @Override
+ public void prepare() throws AdbcException {
+ throw AdbcException.invalidState("[Flight SQL] Cannot execute() this
statement");
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(recordBatch);
+ }
+}
diff --git
a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightInfoReader.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightInfoReader.java
new file mode 100644
index 0000000..9b0cda9
--- /dev/null
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightInfoReader.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.flightsql;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatusCode;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** An ArrowReader that wraps a FlightInfo. */
+public class FlightInfoReader extends ArrowReader {
+ private final Schema schema;
+ private final FlightSqlClient client;
+ private final LoadingCache<Location, FlightClient> clientCache;
+ private final List<FlightEndpoint> flightEndpoints;
+ private int nextEndpointIndex;
+ private FlightStream currentStream;
+ private long bytesRead;
+
+ FlightInfoReader(
+ BufferAllocator allocator,
+ FlightSqlClient client,
+ LoadingCache<Location, FlightClient> clientCache,
+ List<FlightEndpoint> flightEndpoints)
+ throws AdbcException {
+ super(allocator);
+ this.client = client;
+ this.clientCache = clientCache;
+ this.flightEndpoints = flightEndpoints;
+ this.nextEndpointIndex = 0;
+ this.bytesRead = 0;
+
+ try {
+ this.currentStream =
+
client.getStream(flightEndpoints.get(this.nextEndpointIndex++).getTicket());
+ this.schema = this.currentStream.getSchema();
+ } catch (FlightRuntimeException e) {
+ throw FlightSqlDriverUtil.fromFlightException(e);
+ }
+
+ try {
+ this.ensureInitialized();
+ } catch (IOException e) {
+ throw new AdbcException(
+ FlightSqlDriverUtil.prefixExceptionMessage(e.getMessage()),
+ e,
+ AdbcStatusCode.IO,
+ null,
+ 0);
+ }
+ }
+
+ @Override
+ public boolean loadNextBatch() throws IOException {
+ if (!currentStream.next()) {
+ if (nextEndpointIndex >= flightEndpoints.size()) {
+ return false;
+ } else {
+ try {
+ currentStream.close();
+ FlightEndpoint endpoint = flightEndpoints.get(nextEndpointIndex++);
+ currentStream = tryLoadNextStream(endpoint);
+ if (!schema.equals(currentStream.getSchema())) {
+ throw new IOException(
+ "Stream has inconsistent schema. Expected: "
+ + schema
+ + "\nFound: "
+ + currentStream.getSchema());
+ }
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ final VectorSchemaRoot root = currentStream.getRoot();
+ final VectorUnloader unloader = new VectorUnloader(root);
+ final ArrowRecordBatch recordBatch = unloader.getRecordBatch();
+ bytesRead += recordBatch.computeBodyLength();
+ loadRecordBatch(recordBatch);
+ return true;
+ }
+
+ private FlightStream tryLoadNextStream(FlightEndpoint endpoint) throws
IOException {
+ if (endpoint.getLocations().isEmpty()) {
+ return client.getStream(endpoint.getTicket());
+ } else {
+ List<Location> locations = new ArrayList<>(endpoint.getLocations());
+ Collections.shuffle(locations);
+ IOException failure = null;
+ for (final Location location : locations) {
+ try {
+ return
Objects.requireNonNull(clientCache.get(location)).getStream(endpoint.getTicket());
+ } catch (RuntimeException e) {
+ // Also handles CompletionException (from clientCache#get),
FlightRuntimeException
+ if (failure == null) {
+ failure =
+ new IOException("Failed to get stream from location " +
location + ": " + e, e);
+ } else {
+ failure.addSuppressed(
+ new IOException("Failed to get stream from location " +
location + ": " + e, e));
+ }
+ }
+ }
+ throw Objects.requireNonNull(failure);
+ }
+ }
+
+ @Override
+ public long bytesRead() {
+ return bytesRead;
+ }
+
+ @Override
+ protected void closeReadSource() throws IOException {
+ try {
+ currentStream.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected Schema readSchema() {
+ return schema;
+ }
+}
diff --git
a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java
new file mode 100644
index 0000000..5e9c415
--- /dev/null
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.flightsql;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.sql.SqlQuirks;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.impl.Flight;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+public class FlightSqlConnection implements AdbcConnection {
+ private final BufferAllocator allocator;
+ private final FlightSqlClient client;
+ private final SqlQuirks quirks;
+ private final LoadingCache<Location, FlightClient> clientCache;
+
+ FlightSqlConnection(BufferAllocator allocator, FlightClient client,
SqlQuirks quirks) {
+ this.allocator = allocator;
+ this.client = new FlightSqlClient(client);
+ this.quirks = quirks;
+ this.clientCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(5, TimeUnit.MINUTES)
+ .removalListener(
+ (Location key, FlightClient value, RemovalCause cause) -> {
+ if (value == null) return;
+ try {
+ value.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ })
+ .build(location -> FlightClient.builder(allocator,
location).build());
+ }
+
+ @Override
+ public void commit() throws AdbcException {
+ throw AdbcException.notImplemented("[Flight SQL] Transaction methods are
not supported");
+ }
+
+ @Override
+ public AdbcStatement createStatement() throws AdbcException {
+ return new FlightSqlStatement(allocator, client, clientCache, quirks);
+ }
+
+ @Override
+ public AdbcStatement deserializePartitionDescriptor(ByteBuffer descriptor)
throws AdbcException {
+ final FlightEndpoint endpoint;
+ try {
+ final Flight.FlightEndpoint protoEndpoint =
Flight.FlightEndpoint.parseFrom(descriptor);
+ Location[] locations = new Location[protoEndpoint.getLocationCount()];
+ int index = 0;
+ for (Flight.Location protoLocation : protoEndpoint.getLocationList()) {
+ Location location = new Location(protoLocation.getUri());
+ locations[index++] = location;
+ }
+
+ endpoint =
+ new FlightEndpoint(
+ new Ticket(protoEndpoint.getTicket().getTicket().toByteArray()),
locations);
+ } catch (InvalidProtocolBufferException | URISyntaxException e) {
+ throw AdbcException.invalidArgument(
+ "[Flight SQL] Partition descriptor is invalid: " +
e.getMessage())
+ .withCause(e);
+ }
+
+ return FlightSqlStatement.fromDescriptor(
+ allocator, client, clientCache, quirks,
Collections.singletonList(endpoint));
+ }
+
+ @Override
+ public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
+ throws AdbcException {
+ return FlightSqlStatement.ingestRoot(
+ allocator, client, clientCache, quirks, targetTableName, mode);
+ }
+
+ @Override
+ public AdbcStatement getInfo(int[] infoCodes) throws AdbcException {
+ final VectorSchemaRoot root = new InfoMetadataBuilder(allocator, client,
infoCodes).build();
+ return new FixedRootStatement(allocator, root);
+ }
+
+ @Override
+ public void rollback() throws AdbcException {
+ throw AdbcException.notImplemented("[Flight SQL] Transaction methods are
not supported");
+ }
+
+ @Override
+ public boolean getAutoCommit() throws AdbcException {
+ return true;
+ }
+
+ @Override
+ public void setAutoCommit(boolean enableAutoCommit) throws AdbcException {
+ if (!enableAutoCommit) {
+ throw AdbcException.notImplemented("[Flight SQL] Transaction methods are
not supported");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+
+ @Override
+ public String toString() {
+ return "FlightSqlConnection{" + "client=" + client + '}';
+ }
+}
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDatabase.java
similarity index 62%
copy from
java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java
copy to
java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDatabase.java
index 4cef0e4..11ca360 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDatabase.java
@@ -15,60 +15,61 @@
* limitations under the License.
*/
-package org.apache.arrow.adbc.driver.jdbc;
+package org.apache.arrow.adbc.driver.flightsql;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.sql.SqlQuirks;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
/** An instance of a database (e.g. a handle to an in-memory database). */
-public final class JdbcDatabase implements AdbcDatabase {
+public final class FlightSqlDatabase implements AdbcDatabase {
private final BufferAllocator allocator;
- private final String target;
- private final JdbcDriverQuirks quirks;
- private final Connection connection;
+ private final Location location;
+ private final SqlQuirks quirks;
+ private final FlightClient client;
private final AtomicInteger counter;
- JdbcDatabase(BufferAllocator allocator, final String target,
JdbcDriverQuirks quirks)
+ FlightSqlDatabase(BufferAllocator allocator, Location location, SqlQuirks
quirks)
throws AdbcException {
this.allocator = allocator;
- this.target = target;
+ this.location = location;
this.quirks = quirks;
try {
- this.connection = DriverManager.getConnection(target);
- } catch (SQLException e) {
- throw JdbcDriverUtil.fromSqlException(e);
+ this.client = FlightClient.builder(allocator, location).build();
+ } catch (FlightRuntimeException e) {
+ throw FlightSqlDriverUtil.fromFlightException(e);
}
this.counter = new AtomicInteger();
}
@Override
public AdbcConnection connect() throws AdbcException {
- final Connection connection;
+ final FlightClient client;
try {
- connection = DriverManager.getConnection(target);
- } catch (SQLException e) {
- throw JdbcDriverUtil.fromSqlException(e);
+ client = FlightClient.builder(allocator, location).build();
+ } catch (FlightRuntimeException e) {
+ throw FlightSqlDriverUtil.fromFlightException(e);
}
final int count = counter.getAndIncrement();
- return new JdbcConnection(
+ return new FlightSqlConnection(
allocator.newChildAllocator("adbc-jdbc-connection-" + count, 0,
allocator.getLimit()),
- connection,
+ client,
quirks);
}
@Override
public void close() throws Exception {
- connection.close();
+ client.close();
}
@Override
public String toString() {
- return "JdbcDatabase{" + "target='" + target + '\'' + '}';
+ return "FlightSqlDatabase{" + "target='" + location + '\'' + '}';
}
}
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriver.java
similarity index 57%
copy from
java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
copy to
java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriver.java
index 341c849..045d728 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriver.java
@@ -14,41 +14,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.arrow.adbc.driver.jdbc;
+package org.apache.arrow.adbc.driver.flightsql;
+import java.net.URISyntaxException;
import java.util.Map;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.drivermanager.AdbcDriverManager;
+import org.apache.arrow.adbc.sql.SqlQuirks;
+import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
-public enum JdbcDriver implements AdbcDriver {
+/** An ADBC driver wrapping Arrow Flight SQL. */
+public enum FlightSqlDriver implements AdbcDriver {
INSTANCE;
private final BufferAllocator allocator;
- JdbcDriver() {
+ FlightSqlDriver() {
allocator = new RootAllocator();
-
AdbcDriverManager.getInstance().registerDriver("org.apache.arrow.adbc.driver.jdbc",
this);
+
AdbcDriverManager.getInstance().registerDriver("org.apache.arrow.adbc.driver.flightsql",
this);
}
@Override
public AdbcDatabase open(Map<String, Object> parameters) throws
AdbcException {
- Object target = parameters.get("adbc.jdbc.url");
+ Object target = parameters.get("adbc.url");
if (!(target instanceof String)) {
- throw AdbcException.invalidArgument("[JDBC] Must provide String
adbc.jdbc.url parameter");
+ throw AdbcException.invalidArgument(
+ "[Flight SQL] Must provide String " + PARAM_URL + " parameter");
}
- Object quirks = parameters.get("adbc.jdbc.quirks");
+ Location location;
+ try {
+ location = new Location((String) target);
+ } catch (URISyntaxException e) {
+ throw AdbcException.invalidArgument(
+ String.format("[Flight SQL] Location %s is invalid: %s", target,
e))
+ .withCause(e);
+ }
+ Object quirks = parameters.get(PARAM_SQL_QUIRKS);
if (quirks != null) {
Preconditions.checkArgument(
- quirks instanceof JdbcDriverQuirks,
- "[JDBC] adbc.jdbc.quirks must be a JdbcDriverQuirks instance");
+ quirks instanceof SqlQuirks,
+ String.format(
+ "[Flight SQL] %s must be a SqlQuirks instance, not %s",
+ PARAM_SQL_QUIRKS, quirks.getClass().getName()));
} else {
- quirks = new JdbcDriverQuirks();
+ quirks = new SqlQuirks();
}
- return new JdbcDatabase(allocator, (String) target, (JdbcDriverQuirks)
quirks);
+ return new FlightSqlDatabase(allocator, location, (SqlQuirks) quirks);
}
}
diff --git
a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriverUtil.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriverUtil.java
new file mode 100644
index 0000000..cb6b303
--- /dev/null
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriverUtil.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.flightsql;
+
+import java.sql.SQLException;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatusCode;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStatusCode;
+
+final class FlightSqlDriverUtil {
+ private FlightSqlDriverUtil() {
+ throw new AssertionError("Do not instantiate this class");
+ }
+
+ static String prefixExceptionMessage(final String s) {
+ return "[Flight SQL] " + s;
+ }
+
+ static AdbcException fromSqlException(SQLException e) {
+ return new AdbcException(
+ prefixExceptionMessage(e.getMessage()),
+ e.getCause(),
+ AdbcStatusCode.UNKNOWN,
+ e.getSQLState(),
+ e.getErrorCode());
+ }
+
+ static AdbcStatusCode fromFlightStatusCode(FlightStatusCode code) {
+ switch (code) {
+ case OK:
+ throw new IllegalArgumentException("Cannot convert OK status");
+ case UNKNOWN:
+ return AdbcStatusCode.UNKNOWN;
+ case INTERNAL:
+ return AdbcStatusCode.INTERNAL;
+ case INVALID_ARGUMENT:
+ return AdbcStatusCode.INVALID_ARGUMENT;
+ case TIMED_OUT:
+ return AdbcStatusCode.TIMEOUT;
+ case NOT_FOUND:
+ return AdbcStatusCode.NOT_FOUND;
+ case ALREADY_EXISTS:
+ return AdbcStatusCode.ALREADY_EXISTS;
+ case CANCELLED:
+ return AdbcStatusCode.CANCELLED;
+ case UNAUTHENTICATED:
+ return AdbcStatusCode.UNAUTHENTICATED;
+ case UNAUTHORIZED:
+ return AdbcStatusCode.UNAUTHORIZED;
+ case UNIMPLEMENTED:
+ return AdbcStatusCode.NOT_IMPLEMENTED;
+ case UNAVAILABLE:
+ return AdbcStatusCode.IO;
+ default:
+ return AdbcStatusCode.UNKNOWN;
+ }
+ }
+
+ static AdbcException fromFlightException(FlightRuntimeException e) {
+ return new AdbcException(
+ e.getMessage(), e.getCause(), fromFlightStatusCode(e.status().code()),
null, 0);
+ }
+}
diff --git
a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatement.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatement.java
new file mode 100644
index 0000000..3a3e072
--- /dev/null
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatement.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.flightsql;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.AdbcStatusCode;
+import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.PartitionDescriptor;
+import org.apache.arrow.adbc.sql.SqlQuirks;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.impl.Flight;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Field;
+
+public class FlightSqlStatement implements AdbcStatement {
+ private final BufferAllocator allocator;
+ private final FlightSqlClient client;
+ private final LoadingCache<Location, FlightClient> clientCache;
+ private final SqlQuirks quirks;
+
+ // State for SQL queries
+ private String sqlQuery;
+ private FlightSqlClient.PreparedStatement preparedStatement;
+ private List<FlightEndpoint> flightEndpoints;
+ private ArrowReader reader;
+ // State for bulk ingest
+ private BulkState bulkOperation;
+ private VectorSchemaRoot bindRoot;
+
+ FlightSqlStatement(
+ BufferAllocator allocator,
+ FlightSqlClient client,
+ LoadingCache<Location, FlightClient> clientCache,
+ SqlQuirks quirks) {
+ this.allocator = allocator;
+ this.client = client;
+ this.clientCache = clientCache;
+ this.quirks = quirks;
+ this.sqlQuery = null;
+ }
+
+ static FlightSqlStatement ingestRoot(
+ BufferAllocator allocator,
+ FlightSqlClient client,
+ LoadingCache<Location, FlightClient> clientCache,
+ SqlQuirks quirks,
+ String targetTableName,
+ BulkIngestMode mode) {
+ Objects.requireNonNull(targetTableName);
+ final FlightSqlStatement statement =
+ new FlightSqlStatement(allocator, client, clientCache, quirks);
+ statement.bulkOperation = new BulkState();
+ statement.bulkOperation.mode = mode;
+ statement.bulkOperation.targetTable = targetTableName;
+ return statement;
+ }
+
+ public static AdbcStatement fromDescriptor(
+ BufferAllocator allocator,
+ FlightSqlClient client,
+ LoadingCache<Location, FlightClient> clientCache,
+ SqlQuirks quirks,
+ List<FlightEndpoint> flightEndpoints) {
+ final FlightSqlStatement statement =
+ new FlightSqlStatement(allocator, client, clientCache, quirks);
+ statement.flightEndpoints = flightEndpoints;
+ return statement;
+ }
+
+ @Override
+ public void setSqlQuery(String query) throws AdbcException {
+ if (bulkOperation != null) {
+ throw AdbcException.invalidState(
+ "[Flight SQL] Statement is configured for a bulk ingest/append
operation");
+ }
+ sqlQuery = query;
+ }
+
+ @Override
+ public void bind(VectorSchemaRoot root) {
+ bindRoot = root;
+ }
+
+ @Override
+ public void execute() throws AdbcException {
+ if (bulkOperation != null) {
+ executeBulk();
+ } else if (sqlQuery != null) {
+ executeSqlQuery();
+ } else {
+ throw AdbcException.invalidState("[Flight SQL] Must setSqlQuery()
first");
+ }
+ }
+
+ private void createBulkTable() throws AdbcException {
+ final StringBuilder create = new StringBuilder("CREATE TABLE ");
+ create.append(bulkOperation.targetTable);
+ create.append(" (");
+ for (int col = 0; col < bindRoot.getFieldVectors().size(); col++) {
+ if (col > 0) {
+ create.append(", ");
+ }
+ final Field field = bindRoot.getVector(col).getField();
+ create.append(field.getName());
+ create.append(' ');
+ String typeName =
quirks.getArrowToSqlTypeNameMapping().apply(field.getType());
+ if (typeName == null) {
+ throw AdbcException.notImplemented(
+ "[Flight SQL] Cannot generate CREATE TABLE statement for field " +
field);
+ }
+ create.append(typeName);
+ }
+ create.append(")");
+
+ try {
+ client.executeUpdate(create.toString());
+ } catch (FlightRuntimeException e) {
+ throw new AdbcException(
+ "[Flight SQL] Could not create table for bulk ingestion: " +
bulkOperation.targetTable,
+ e,
+ AdbcStatusCode.ALREADY_EXISTS,
+ null,
+ 0);
+ }
+ }
+
+ private void executeBulk() throws AdbcException {
+ if (bindRoot == null) {
+ throw AdbcException.invalidState("[Flight SQL] Must call bind() before
bulk insert");
+ }
+
+ if (bulkOperation.mode == BulkIngestMode.CREATE) {
+ createBulkTable();
+ }
+
+ // XXX: potential injection
+ final StringBuilder insert = new StringBuilder("INSERT INTO ");
+ insert.append(bulkOperation.targetTable);
+ insert.append(" VALUES (");
+ for (int col = 0; col < bindRoot.getFieldVectors().size(); col++) {
+ if (col > 0) {
+ insert.append(", ");
+ }
+ insert.append("?");
+ }
+ insert.append(")");
+
+ final FlightSqlClient.PreparedStatement statement;
+ try {
+ statement = client.prepare(insert.toString());
+ } catch (FlightRuntimeException e) {
+ throw new AdbcException(
+ "[Flight SQL] Could not prepare statement for bulk ingestion into "
+ + bulkOperation.targetTable,
+ e,
+ AdbcStatusCode.NOT_FOUND,
+ null,
+ 0);
+ }
+ try {
+ try {
+ statement.setParameters(new NonOwningRoot(bindRoot));
+ statement.executeUpdate();
+ } finally {
+ statement.close();
+ }
+ } catch (FlightRuntimeException e) {
+ throw FlightSqlDriverUtil.fromFlightException(e);
+ }
+ }
+
+ private void executeSqlQuery() throws AdbcException {
+ try {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new AdbcException(
+ "[Flight SQL] Failed to close unread result set",
+ e,
+ AdbcStatusCode.IO,
+ null, /*vendorCode*/
+ 0);
+ }
+ }
+
+ if (preparedStatement != null) {
+ // TODO: This binds only the LAST row
+ if (bindRoot != null) {
+ preparedStatement.setParameters(new NonOwningRoot(bindRoot));
+ }
+ // XXX(ARROW-17199): why does this throw SQLException?
+ flightEndpoints = preparedStatement.execute().getEndpoints();
+ } else {
+ flightEndpoints = client.execute(sqlQuery).getEndpoints();
+ }
+ } catch (FlightRuntimeException e) {
+ throw FlightSqlDriverUtil.fromFlightException(e);
+ } catch (SQLException e) {
+ throw FlightSqlDriverUtil.fromSqlException(e);
+ }
+ }
+
+ @Override
+ public ArrowReader getArrowReader() throws AdbcException {
+ if (reader != null) {
+ ArrowReader result = reader;
+ reader = null;
+ return result;
+ }
+ if (flightEndpoints == null) {
+ throw AdbcException.invalidState("[Flight SQL] Must call execute()
before getArrowReader()");
+ }
+ final ArrowReader reader =
+ new FlightInfoReader(allocator, client, clientCache, flightEndpoints);
+ flightEndpoints = null;
+ return reader;
+ }
+
+ @Override
+ public List<PartitionDescriptor> getPartitionDescriptors() throws
AdbcException {
+ if (flightEndpoints == null) {
+ throw AdbcException.invalidState(
+ "[Flight SQL] Must call execute() before getPartitionDescriptors()");
+ }
+ final List<PartitionDescriptor> result = new ArrayList<>();
+ for (final FlightEndpoint endpoint : flightEndpoints) {
+ // FlightEndpoint doesn't expose its serializer, so do it manually
+ Flight.FlightEndpoint.Builder protoEndpoint =
+ Flight.FlightEndpoint.newBuilder()
+ .setTicket(
+ Flight.Ticket.newBuilder()
+
.setTicket(ByteString.copyFrom(endpoint.getTicket().getBytes())));
+ for (final Location location : endpoint.getLocations()) {
+ protoEndpoint.addLocation(
+
Flight.Location.newBuilder().setUri(location.getUri().toString()).build());
+ }
+ result.add(
+ new
PartitionDescriptor(protoEndpoint.build().toByteString().asReadOnlyByteBuffer()));
+ }
+ return result;
+ }
+
+ @Override
+ public void prepare() throws AdbcException {
+ try {
+ if (sqlQuery == null) {
+ throw AdbcException.invalidArgument(
+ "[Flight SQL] Must call setSqlQuery(String) before prepare()");
+ }
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new AdbcException(
+ "[Flight SQL] Failed to close unread result set",
+ e,
+ AdbcStatusCode.IO,
+ null, /*vendorCode*/
+ 0);
+ }
+ }
+
+ preparedStatement = client.prepare(sqlQuery);
+ } catch (FlightRuntimeException e) {
+ throw FlightSqlDriverUtil.fromFlightException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(reader, preparedStatement);
+ }
+
+ private static final class BulkState {
+ public BulkIngestMode mode;
+ String targetTable;
+ }
+
+ /** A VectorSchemaRoot which does not own its data. */
+ private static final class NonOwningRoot extends VectorSchemaRoot {
+ public NonOwningRoot(VectorSchemaRoot parent) {
+ super(parent.getSchema(), parent.getFieldVectors(),
parent.getRowCount());
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git
a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/InfoMetadataBuilder.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/InfoMetadataBuilder.java
new file mode 100644
index 0000000..318405d
--- /dev/null
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/InfoMetadataBuilder.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.flightsql;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcInfoCode;
+import org.apache.arrow.adbc.core.StandardSchemas;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.DenseUnionVector;
+
+/** Helper class to track state needed to build up the info structure. */
+final class InfoMetadataBuilder implements AutoCloseable {
+ private static final byte STRING_VALUE_TYPE_ID = (byte) 0;
+ private static final Map<Integer, Integer> ADBC_TO_FLIGHT_SQL_CODES = new
HashMap<>();
+ private static final Map<Integer, AddInfo> SUPPORTED_CODES = new HashMap<>();
+
+ private final Collection<Integer> requestedCodes;
+ private final FlightSqlClient client;
+ private VectorSchemaRoot root;
+
+ private final UInt4Vector infoCodes;
+ private final DenseUnionVector infoValues;
+ private final VarCharVector stringValues;
+
+ @FunctionalInterface
+ interface AddInfo {
+ void accept(InfoMetadataBuilder builder, DenseUnionVector sqlInfo, int
srcIndex, int dstIndex);
+ }
+
+ static {
+ ADBC_TO_FLIGHT_SQL_CODES.put(
+ AdbcInfoCode.VENDOR_NAME.getValue(),
FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME.getNumber());
+ ADBC_TO_FLIGHT_SQL_CODES.put(
+ AdbcInfoCode.VENDOR_VERSION.getValue(),
+ FlightSql.SqlInfo.FLIGHT_SQL_SERVER_VERSION.getNumber());
+
+ SUPPORTED_CODES.put(
+ FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME.getNumber(),
+ (b, sqlInfo, srcIndex, dstIndex) -> {
+ b.infoCodes.setSafe(dstIndex, AdbcInfoCode.VENDOR_NAME.getValue());
+ b.setStringValue(dstIndex,
sqlInfo.getVarCharVector(STRING_VALUE_TYPE_ID).get(srcIndex));
+ });
+ SUPPORTED_CODES.put(
+ FlightSql.SqlInfo.FLIGHT_SQL_SERVER_VERSION.getNumber(),
+ (b, sqlInfo, srcIndex, dstIndex) -> {
+ b.infoCodes.setSafe(dstIndex,
AdbcInfoCode.VENDOR_VERSION.getValue());
+ b.setStringValue(dstIndex,
sqlInfo.getVarCharVector(STRING_VALUE_TYPE_ID).get(srcIndex));
+ });
+ }
+
+ InfoMetadataBuilder(BufferAllocator allocator, FlightSqlClient client, int[]
infoCodes) {
+ if (infoCodes == null) {
+ this.requestedCodes = new ArrayList<>(SUPPORTED_CODES.keySet());
+ this.requestedCodes.add(AdbcInfoCode.DRIVER_NAME.getValue());
+ this.requestedCodes.add(AdbcInfoCode.DRIVER_VERSION.getValue());
+ } else {
+ this.requestedCodes =
IntStream.of(infoCodes).boxed().collect(Collectors.toList());
+ }
+ this.client = client;
+ this.root = VectorSchemaRoot.create(StandardSchemas.GET_INFO_SCHEMA,
allocator);
+ this.infoCodes = (UInt4Vector) root.getVector(0);
+ this.infoValues = (DenseUnionVector) root.getVector(1);
+ this.stringValues = this.infoValues.getVarCharVector((byte) 0);
+ }
+
+ void setStringValue(int index, byte[] value) {
+ infoValues.setValueCount(index + 1);
+ infoValues.setTypeId(index, STRING_VALUE_TYPE_ID);
+ stringValues.setSafe(index, value);
+ infoValues
+ .getOffsetBuffer()
+ .setInt((long) index * DenseUnionVector.OFFSET_WIDTH,
stringValues.getLastSet());
+ }
+
+ VectorSchemaRoot build() throws AdbcException {
+ // XXX: rather hacky, we need a better way to do this
+ int dstIndex = 0;
+
+ List<Integer> translatedCodes = new ArrayList<>();
+ for (int code : requestedCodes) {
+ Integer translatedCode = ADBC_TO_FLIGHT_SQL_CODES.get(code);
+ if (translatedCode != null) {
+ translatedCodes.add(translatedCode);
+ } else if (code == AdbcInfoCode.DRIVER_NAME.getValue()) {
+ infoCodes.setSafe(dstIndex, code);
+ setStringValue(dstIndex++, "ADBC Flight SQL
Driver".getBytes(StandardCharsets.UTF_8));
+ } else if (code == AdbcInfoCode.DRIVER_VERSION.getValue()) {
+ infoCodes.setSafe(dstIndex, code);
+ // TODO: actual version
+ setStringValue(dstIndex++, "0.0.1".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ final FlightInfo info = client.getSqlInfo(translatedCodes);
+
+ for (final FlightEndpoint endpoint : info.getEndpoints()) {
+ // TODO: this should account for locations property
+ try (final FlightStream stream = client.getStream(endpoint.getTicket()))
{
+ final VectorSchemaRoot root = stream.getRoot();
+ final UInt4Vector sqlCode = (UInt4Vector) root.getVector(0);
+ final DenseUnionVector sqlInfo = (DenseUnionVector) root.getVector(1);
+ for (int srcIndex = 0; srcIndex < root.getRowCount(); srcIndex++) {
+ final AddInfo addInfo = SUPPORTED_CODES.get(sqlCode.get(srcIndex));
+ if (addInfo != null) {
+ addInfo.accept(this, sqlInfo, srcIndex, dstIndex++);
+ }
+ }
+ } catch (FlightRuntimeException e) {
+ throw FlightSqlDriverUtil.fromFlightException(e);
+ } catch (Exception e) {
+ throw AdbcException.io("[Flight SQL] " + e.getMessage()).withCause(e);
+ }
+ }
+
+ root.setRowCount(dstIndex);
+ VectorSchemaRoot result = root;
+ root = null;
+ return result;
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(root);
+ }
+}
diff --git
a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/RootArrowReader.java
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/RootArrowReader.java
new file mode 100644
index 0000000..0325fd7
--- /dev/null
+++
b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/RootArrowReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.flightsql;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** An ArrowReader that wraps a list of ArrowRecordBatches. */
+class RootArrowReader extends ArrowReader {
+ private final Schema schema;
+ private final List<ArrowRecordBatch> batches;
+ int nextIndex;
+
+ public RootArrowReader(BufferAllocator allocator, Schema schema,
List<ArrowRecordBatch> batches) {
+ super(allocator);
+ this.schema = schema;
+ this.batches = batches;
+ this.nextIndex = 0;
+ }
+
+ @Override
+ public boolean loadNextBatch() throws IOException {
+ if (nextIndex < batches.size()) {
+ new VectorLoader(getVectorSchemaRoot()).load(batches.get(nextIndex++));
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public long bytesRead() {
+ return 0;
+ }
+
+ @Override
+ protected void closeReadSource() throws IOException {
+ try {
+ AutoCloseables.close(batches);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected Schema readSchema() {
+ return schema;
+ }
+}
diff --git
a/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java
b/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java
index 877f512..b8a4def 100644
---
a/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java
+++
b/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java
@@ -25,6 +25,7 @@ import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.driver.jdbc.JdbcDriver;
import org.apache.arrow.adbc.driver.testsuite.SqlValidationQuirks;
@@ -39,7 +40,7 @@ public class DerbyQuirks extends SqlValidationQuirks {
@Override
public AdbcDatabase initDatabase() throws AdbcException {
final Map<String, Object> parameters = new HashMap<>();
- parameters.put("adbc.jdbc.url", jdbcUrl);
+ parameters.put(AdbcDriver.PARAM_URL, jdbcUrl);
return JdbcDriver.INSTANCE.open(parameters);
}
diff --git
a/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java
b/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java
index a0a1885..126adac 100644
---
a/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java
+++
b/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java
@@ -24,10 +24,11 @@ import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.driver.jdbc.JdbcDriver;
-import org.apache.arrow.adbc.driver.jdbc.JdbcDriverQuirks;
import org.apache.arrow.adbc.driver.testsuite.SqlValidationQuirks;
+import org.apache.arrow.adbc.sql.SqlQuirks;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.junit.jupiter.api.Assumptions;
@@ -52,17 +53,16 @@ public class PostgresqlQuirks extends SqlValidationQuirks {
String url = makeJdbcUrl();
final Map<String, Object> parameters = new HashMap<>();
- parameters.put("adbc.jdbc.url", url);
+ parameters.put(AdbcDriver.PARAM_URL, url);
parameters.put(
- "adbc.jdbc.quirks",
- JdbcDriverQuirks.builder()
+ AdbcDriver.PARAM_SQL_QUIRKS,
+ SqlQuirks.builder()
.arrowToSqlTypeNameMapping(
(arrowType -> {
if (arrowType.getTypeID() == ArrowType.ArrowTypeID.Utf8) {
return "TEXT";
}
- return
JdbcDriverQuirks.DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING.apply(
- arrowType);
+ return
SqlQuirks.DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING.apply(arrowType);
}))
.build());
return JdbcDriver.INSTANCE.open(parameters);
diff --git a/java/driver/jdbc/pom.xml b/java/driver/jdbc/pom.xml
index 0ea8d43..05b0e9f 100644
--- a/java/driver/jdbc/pom.xml
+++ b/java/driver/jdbc/pom.xml
@@ -50,37 +50,9 @@
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-manager</artifactId>
</dependency>
-
- <!-- Derby -->
- <!-- Cannot upgrade beyond this version for Java 8 support -->
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.14.2.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derbytools</artifactId>
- <version>10.14.2.0</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Testing -->
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-validation</artifactId>
- <scope>test</scope>
+ <artifactId>adbc-sql</artifactId>
</dependency>
</dependencies>
</project>
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
index 5bbb3b0..5982236 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
+++
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
@@ -29,6 +29,7 @@ import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.core.StandardSchemas;
+import org.apache.arrow.adbc.sql.SqlQuirks;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -39,9 +40,9 @@ import org.apache.arrow.vector.types.pojo.Schema;
public class JdbcConnection implements AdbcConnection {
private final BufferAllocator allocator;
private final Connection connection;
- private final JdbcDriverQuirks quirks;
+ private final SqlQuirks quirks;
- JdbcConnection(BufferAllocator allocator, Connection connection,
JdbcDriverQuirks quirks) {
+ JdbcConnection(BufferAllocator allocator, Connection connection, SqlQuirks
quirks) {
this.allocator = allocator;
this.connection = connection;
this.quirks = quirks;
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java
index 4cef0e4..7c8a2a2 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java
+++
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java
@@ -24,17 +24,18 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.sql.SqlQuirks;
import org.apache.arrow.memory.BufferAllocator;
/** An instance of a database (e.g. a handle to an in-memory database). */
public final class JdbcDatabase implements AdbcDatabase {
private final BufferAllocator allocator;
private final String target;
- private final JdbcDriverQuirks quirks;
+ private final SqlQuirks quirks;
private final Connection connection;
private final AtomicInteger counter;
- JdbcDatabase(BufferAllocator allocator, final String target,
JdbcDriverQuirks quirks)
+ JdbcDatabase(BufferAllocator allocator, final String target, SqlQuirks
quirks)
throws AdbcException {
this.allocator = allocator;
this.target = target;
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
index 341c849..67db75a 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
+++
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java
@@ -21,10 +21,12 @@ import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.drivermanager.AdbcDriverManager;
+import org.apache.arrow.adbc.sql.SqlQuirks;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
+/** An ADBC driver wrapping the JDBC API. */
public enum JdbcDriver implements AdbcDriver {
INSTANCE;
@@ -37,18 +39,20 @@ public enum JdbcDriver implements AdbcDriver {
@Override
public AdbcDatabase open(Map<String, Object> parameters) throws
AdbcException {
- Object target = parameters.get("adbc.jdbc.url");
+ Object target = parameters.get(PARAM_URL);
if (!(target instanceof String)) {
- throw AdbcException.invalidArgument("[JDBC] Must provide String
adbc.jdbc.url parameter");
+ throw AdbcException.invalidArgument("[JDBC] Must provide String " +
PARAM_URL + " parameter");
}
- Object quirks = parameters.get("adbc.jdbc.quirks");
+ Object quirks = parameters.get(PARAM_SQL_QUIRKS);
if (quirks != null) {
Preconditions.checkArgument(
- quirks instanceof JdbcDriverQuirks,
- "[JDBC] adbc.jdbc.quirks must be a JdbcDriverQuirks instance");
+ quirks instanceof SqlQuirks,
+ String.format(
+ "[JDBC] %s must be a SqlQuirks instance, not %s",
+ PARAM_SQL_QUIRKS, quirks.getClass().getName()));
} else {
- quirks = new JdbcDriverQuirks();
+ quirks = new SqlQuirks();
}
- return new JdbcDatabase(allocator, (String) target, (JdbcDriverQuirks)
quirks);
+ return new JdbcDatabase(allocator, (String) target, (SqlQuirks) quirks);
}
}
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
index df523d4..1d243bd 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
+++
b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
@@ -29,6 +29,7 @@ import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.core.AdbcStatusCode;
import org.apache.arrow.adbc.core.BulkIngestMode;
import org.apache.arrow.adbc.driver.jdbc.util.JdbcParameterBinder;
+import org.apache.arrow.adbc.sql.SqlQuirks;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -38,7 +39,7 @@ import org.apache.arrow.vector.types.pojo.Field;
public class JdbcStatement implements AdbcStatement {
private final BufferAllocator allocator;
private final Connection connection;
- private final JdbcDriverQuirks quirks;
+ private final SqlQuirks quirks;
// State for SQL queries
private Statement statement;
@@ -49,7 +50,7 @@ public class JdbcStatement implements AdbcStatement {
private BulkState bulkOperation;
private VectorSchemaRoot bindRoot;
- JdbcStatement(BufferAllocator allocator, Connection connection,
JdbcDriverQuirks quirks) {
+ JdbcStatement(BufferAllocator allocator, Connection connection, SqlQuirks
quirks) {
this.allocator = allocator;
this.connection = connection;
this.quirks = quirks;
@@ -59,7 +60,7 @@ public class JdbcStatement implements AdbcStatement {
static JdbcStatement ingestRoot(
BufferAllocator allocator,
Connection connection,
- JdbcDriverQuirks quirks,
+ SqlQuirks quirks,
String targetTableName,
BulkIngestMode mode) {
Objects.requireNonNull(targetTableName);
@@ -179,7 +180,11 @@ public class JdbcStatement implements AdbcStatement {
reader.close();
} catch (IOException e) {
throw new AdbcException(
- "Failed to close unread result set", e, AdbcStatusCode.IO, null,
/*vendorCode*/ 0);
+ "[JDBC] Failed to close unread result set",
+ e,
+ AdbcStatusCode.IO,
+ null, /*vendorCode*/
+ 0);
}
}
if (resultSet != null) {
@@ -225,6 +230,10 @@ public class JdbcStatement implements AdbcStatement {
@Override
public void prepare() throws AdbcException {
try {
+ if (sqlQuery == null) {
+ throw AdbcException.invalidArgument(
+ "[Flight SQL] Must call setSqlQuery(String) before prepare()");
+ }
if (resultSet != null) {
resultSet.close();
}
diff --git
a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
index ecb0f4e..46ae770 100644
---
a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
+++
b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
@@ -79,7 +79,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getInfo() throws Exception {
+ public void getInfo() throws Exception {
try (final AdbcStatement stmt = connection.getInfo()) {
try (final ArrowReader reader = stmt.getArrowReader()) {
assertThat(reader.getVectorSchemaRoot().getSchema())
@@ -91,7 +91,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getInfoByCode() throws Exception {
+ public void getInfoByCode() throws Exception {
try (final AdbcStatement stmt =
connection.getInfo(new AdbcInfoCode[] {AdbcInfoCode.DRIVER_NAME})) {
try (final ArrowReader reader = stmt.getArrowReader()) {
@@ -112,7 +112,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getObjectsColumns() throws Exception {
+ public void getObjectsColumns() throws Exception {
final Schema schema = util.ingestTableIntsStrs(allocator, connection,
tableName);
boolean tableFound = false;
try (final AdbcStatement stmt =
@@ -153,7 +153,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getObjectsCatalogs() throws Exception {
+ public void getObjectsCatalogs() throws Exception {
util.ingestTableIntsStrs(allocator, connection, tableName);
try (final AdbcStatement stmt =
connection.getObjects(
@@ -171,7 +171,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getObjectsDbSchemas() throws Exception {
+ public void getObjectsDbSchemas() throws Exception {
util.ingestTableIntsStrs(allocator, connection, tableName);
try (final AdbcStatement stmt =
connection.getObjects(
@@ -186,7 +186,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getObjectsTables() throws Exception {
+ public void getObjectsTables() throws Exception {
util.ingestTableIntsStrs(allocator, connection, tableName);
try (final AdbcStatement stmt =
connection.getObjects(
@@ -208,7 +208,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getTableSchema() throws Exception {
+ public void getTableSchema() throws Exception {
final Schema schema =
new Schema(
Arrays.asList(
@@ -226,7 +226,7 @@ public abstract class AbstractConnectionMetadataTest {
}
@Test
- void getTableTypes() throws Exception {
+ public void getTableTypes() throws Exception {
try (final AdbcStatement stmt = connection.getTableTypes()) {
try (final ArrowReader reader = stmt.getArrowReader()) {
assertThat(reader.getVectorSchemaRoot().getSchema())
diff --git
a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractPartitionDescriptorTest.java
b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractPartitionDescriptorTest.java
new file mode 100644
index 0000000..7246732
--- /dev/null
+++
b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractPartitionDescriptorTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.testsuite;
+
+import static
org.apache.arrow.adbc.driver.testsuite.ArrowAssertions.assertRoot;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.BulkIngestMode;
+import org.apache.arrow.adbc.core.PartitionDescriptor;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public abstract class AbstractPartitionDescriptorTest {
+ /** Must be initialized by the subclass. */
+ protected static SqlValidationQuirks quirks;
+
+ protected AdbcDatabase database;
+ protected AdbcConnection connection;
+ protected BufferAllocator allocator;
+ protected SqlTestUtil util;
+ protected String tableName;
+ protected Schema schema;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ Preconditions.checkNotNull(quirks, "Must initialize quirks in subclass
with @BeforeAll");
+ database = quirks.initDatabase();
+ connection = database.connect();
+ allocator = new RootAllocator();
+ util = new SqlTestUtil(quirks);
+ tableName = quirks.caseFoldTableName("bulktable");
+ schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable(
+ quirks.caseFoldColumnName("ints"), new ArrowType.Int(32,
/*signed=*/ true)),
+ Field.nullable(quirks.caseFoldColumnName("strs"), new
ArrowType.Utf8())));
+ quirks.cleanupTable(tableName);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ quirks.cleanupTable(tableName);
+ AutoCloseables.close(connection, database, allocator);
+ }
+
+ @Test
+ public void serializeDeserializeQuery() throws Exception {
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ final IntVector ints = (IntVector) root.getVector(0);
+ final VarCharVector strs = (VarCharVector) root.getVector(1);
+
+ ints.allocateNew(4);
+ ints.setSafe(0, 0);
+ ints.setSafe(1, 1);
+ ints.setSafe(2, 2);
+ ints.setNull(3);
+ strs.allocateNew(4);
+ strs.setNull(0);
+ strs.setSafe(1, "foo".getBytes(StandardCharsets.UTF_8));
+ strs.setSafe(2, "".getBytes(StandardCharsets.UTF_8));
+ strs.setSafe(3, "asdf".getBytes(StandardCharsets.UTF_8));
+ root.setRowCount(4);
+
+ try (final AdbcStatement stmt = connection.bulkIngest(tableName,
BulkIngestMode.CREATE)) {
+ stmt.bind(root);
+ stmt.execute();
+ }
+ final List<PartitionDescriptor> descriptors;
+ try (final AdbcStatement stmt = connection.createStatement()) {
+ stmt.setSqlQuery("SELECT * FROM " + tableName);
+ stmt.execute();
+ descriptors = stmt.getPartitionDescriptors();
+ // For convenience, assume database won't shard 4 rows over more than
1 partition…
+ assertThat(descriptors).hasSize(1);
+ }
+
+ // The serialized partition descriptor should be executable on a
separate connection
+ try (final AdbcConnection connection2 = database.connect();
+ final AdbcStatement stmt =
+
connection2.deserializePartitionDescriptor(descriptors.get(0).getDescriptor());
+ final ArrowReader reader = stmt.getArrowReader()) {
+ assertThat(reader.loadNextBatch()).isTrue();
+
assertThat(reader.getVectorSchemaRoot().getSchema()).isEqualTo(root.getSchema());
+ assertRoot(reader.getVectorSchemaRoot()).isEqualTo(root);
+ }
+ }
+ }
+}
diff --git
a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
index 947cd3d..c6fc97c 100644
---
a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
+++
b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java
@@ -71,6 +71,7 @@ public abstract class AbstractStatementTest {
Field.nullable(
quirks.caseFoldColumnName("ints"), new ArrowType.Int(32,
/*signed=*/ true)),
Field.nullable(quirks.caseFoldColumnName("strs"), new
ArrowType.Utf8())));
+ quirks.cleanupTable(tableName);
}
@AfterEach
@@ -80,7 +81,7 @@ public abstract class AbstractStatementTest {
}
@Test
- public void bulkInsertAppend() throws Exception {
+ public void bulkIngestAppend() throws Exception {
try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
final IntVector ints = (IntVector) root.getVector(0);
final VarCharVector strs = (VarCharVector) root.getVector(1);
@@ -140,6 +141,7 @@ public abstract class AbstractStatementTest {
Collections.singletonList(
Field.nullable(quirks.caseFoldColumnName("ints"), new
ArrowType.Utf8())));
try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ root.setRowCount(1);
try (final AdbcStatement stmt = connection.bulkIngest(tableName,
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
@@ -169,6 +171,7 @@ public abstract class AbstractStatementTest {
@Test
public void bulkIngestCreateConflict() throws Exception {
try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ root.setRowCount(1);
try (final AdbcStatement stmt = connection.bulkIngest(tableName,
BulkIngestMode.CREATE)) {
stmt.bind(root);
stmt.execute();
@@ -178,7 +181,7 @@ public abstract class AbstractStatementTest {
try (final AdbcStatement stmt = connection.bulkIngest(tableName,
BulkIngestMode.CREATE)) {
stmt.bind(root);
final AdbcException e = assertThrows(AdbcException.class,
stmt::execute);
- assertThat(e.getStatus()).isEqualTo(AdbcStatusCode.ALREADY_EXISTS);
+ assertThat(e.getStatus()).describedAs("%s",
e).isEqualTo(AdbcStatusCode.ALREADY_EXISTS);
}
}
}
@@ -193,7 +196,9 @@ public abstract class AbstractStatementTest {
assertThat(reader.getVectorSchemaRoot().getSchema()).isEqualTo(expectedSchema);
assertThat(reader.loadNextBatch()).isTrue();
assertThat(reader.getVectorSchemaRoot().getRowCount()).isEqualTo(4);
- assertThat(reader.loadNextBatch()).isFalse();
+ while (reader.loadNextBatch()) {
+ assertThat(reader.getVectorSchemaRoot().getRowCount()).isEqualTo(0);
+ }
}
}
}
diff --git a/java/pom.xml b/java/pom.xml
index 11987e1..fc9bcea 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -77,12 +77,15 @@
<modules>
<module>core</module>
+ <module>driver/flight-sql</module>
+ <module>driver/flight-sql-validation</module>
<module>driver/jdbc</module>
<module>driver/jdbc-util</module>
<module>driver/jdbc-validation-derby</module>
<module>driver/jdbc-validation-postgresql</module>
<module>driver/validation</module>
<module>driver-manager</module>
+ <module>sql</module>
</modules>
<dependencyManagement>
@@ -103,12 +106,28 @@
<artifactId>arrow-vector</artifactId>
<version>${dep.arrow.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-core</artifactId>
+ <version>${dep.arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql</artifactId>
+ <version>${dep.arrow.version}</version>
+ </dependency>
+ <!-- ADBC -->
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-core</artifactId>
<version>${adbc.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-driver-flight-sql</artifactId>
+ <version>${adbc.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-jdbc</artifactId>
@@ -129,6 +148,11 @@
<artifactId>adbc-driver-manager</artifactId>
<version>${adbc.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-sql</artifactId>
+ <version>${adbc.version}</version>
+ </dependency>
<!-- Testing -->
<dependency>
diff --git a/java/driver/jdbc/pom.xml b/java/sql/pom.xml
similarity index 53%
copy from java/driver/jdbc/pom.xml
copy to java/sql/pom.xml
index 0ea8d43..5009d67 100644
--- a/java/driver/jdbc/pom.xml
+++ b/java/sql/pom.xml
@@ -15,57 +15,19 @@
<artifactId>arrow-adbc-java-root</artifactId>
<groupId>org.apache.arrow.adbc</groupId>
<version>9.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
</parent>
- <artifactId>adbc-driver-jdbc</artifactId>
+ <artifactId>adbc-sql</artifactId>
<packaging>jar</packaging>
- <name>Arrow ADBC Driver JDBC</name>
- <description>An ADBC driver wrapping the JDBC API.</description>
+ <name>Arrow ADBC SQL</name>
+ <description>Common utilities for SQL-based ADBC drivers.</description>
<dependencies>
- <!-- Arrow -->
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-jdbc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-memory-core</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-jdbc-util</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-manager</artifactId>
- </dependency>
-
- <!-- Derby -->
- <!-- Cannot upgrade beyond this version for Java 8 support -->
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.14.2.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derbytools</artifactId>
- <version>10.14.2.0</version>
- <scope>test</scope>
- </dependency>
-
<!-- Testing -->
<dependency>
<groupId>org.assertj</groupId>
@@ -77,10 +39,5 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.arrow.adbc</groupId>
- <artifactId>adbc-driver-validation</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
diff --git
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverQuirks.java
b/java/sql/src/main/java/org/apache/arrow/adbc/sql/SqlQuirks.java
similarity index 89%
rename from
java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverQuirks.java
rename to java/sql/src/main/java/org/apache/arrow/adbc/sql/SqlQuirks.java
index 99c87e6..007f699 100644
---
a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverQuirks.java
+++ b/java/sql/src/main/java/org/apache/arrow/adbc/sql/SqlQuirks.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.arrow.adbc.driver.jdbc;
+package org.apache.arrow.adbc.sql;
import java.util.function.Function;
import org.apache.arrow.vector.types.pojo.ArrowType;
-/** Parameters to pass to the ADBC JDBC driver to account for
driver/vendor-specific quirks. */
-public final class JdbcDriverQuirks {
+/** Parameters to pass to SQL-based drivers to account for
driver/vendor-specific SQL quirks. */
+public final class SqlQuirks {
public static final Function<ArrowType, String>
DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING =
(arrowType) -> {
switch (arrowType.getTypeID()) {
@@ -58,7 +58,7 @@ public final class JdbcDriverQuirks {
};
Function<ArrowType, String> arrowToSqlTypeNameMapping;
- public JdbcDriverQuirks() {
+ public SqlQuirks() {
this.arrowToSqlTypeNameMapping =
DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING;
}
@@ -80,8 +80,8 @@ public final class JdbcDriverQuirks {
return this;
}
- public JdbcDriverQuirks build() {
- final JdbcDriverQuirks quirks = new JdbcDriverQuirks();
+ public SqlQuirks build() {
+ final SqlQuirks quirks = new SqlQuirks();
if (arrowToSqlTypeNameMapping != null) {
quirks.arrowToSqlTypeNameMapping = arrowToSqlTypeNameMapping;
}
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
b/java/sql/src/main/java/org/apache/arrow/adbc/sql/package-info.java
similarity index 71%
copy from java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
copy to java/sql/src/main/java/org/apache/arrow/adbc/sql/package-info.java
index 8b71f95..7ed2c8b 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java
+++ b/java/sql/src/main/java/org/apache/arrow/adbc/sql/package-info.java
@@ -15,16 +15,9 @@
* limitations under the License.
*/
-package org.apache.arrow.adbc.core;
-
-import java.util.Map;
-
-/** A handle to an ADBC database driver. */
-public interface AdbcDriver {
- /**
- * Open a database via this driver.
- *
- * @param parameters Driver-specific parameters.
- */
- AdbcDatabase open(Map<String, Object> parameters) throws AdbcException;
-}
+/**
+ * This module contains common utilities for drivers working with SQL.
+ *
+ * <p>ADBC is currently experimental.
+ */
+package org.apache.arrow.adbc.sql;