This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-java.git
The following commit(s) were added to refs/heads/main by this push:
new b9e40fa0a GH-942: Fix JDBC Connection.setCatalog() (#943)
b9e40fa0a is described below
commit b9e40fa0a90143eef36ca53d2f937f2fef494402
Author: André Eickler <[email protected]>
AuthorDate: Mon Jan 5 07:39:15 2026 +0100
GH-942: Fix JDBC Connection.setCatalog() (#943)
## What's Changed
Connection.setCatalog() is not silently ignored anymore (through the
default implementation in Calcite) but instead it updates the catalog
session option in the same way as during the initial connection.
Closes #942.
---
.../arrow/driver/jdbc/ArrowFlightMetaImpl.java | 23 ++++++-
.../jdbc/client/ArrowFlightSqlClientHandler.java | 80 ++++++++++++++--------
.../apache/arrow/driver/jdbc/ConnectionTest.java | 40 ++++++++++-
.../driver/jdbc/utils/MockFlightSqlProducer.java | 19 +++++
4 files changed, 131 insertions(+), 31 deletions(-)
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java
index 21cc3e431..64529b50c 100644
---
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java
@@ -79,7 +79,8 @@ public class ArrowFlightMetaImpl extends MetaImpl {
public void closeStatement(final StatementHandle statementHandle) {
PreparedStatement preparedStatement =
statementHandlePreparedStatementMap.remove(new
StatementHandleKey(statementHandle));
- // Testing if the prepared statement was created because the statement can
be not created until
+ // Testing if the prepared statement was created because the statement can
be
+ // not created until
// this moment
if (preparedStatement != null) {
preparedStatement.close();
@@ -224,7 +225,8 @@ public class ArrowFlightMetaImpl extends MetaImpl {
MetaResultSet.create(handle.connectionId, handle.id, false,
handle.signature, null);
return new ExecuteResult(Collections.singletonList(metaResultSet));
} catch (SQLTimeoutException e) {
- // So far AvaticaStatement(executeInternal) only handles NoSuchStatement
and Runtime
+ // So far AvaticaStatement(executeInternal) only handles NoSuchStatement
and
+ // Runtime
// Exceptions.
throw new RuntimeException(e);
} catch (SQLException e) {
@@ -253,6 +255,20 @@ public class ArrowFlightMetaImpl extends MetaImpl {
return false;
}
+ @Override
+ public ConnectionProperties connectionSync(ConnectionHandle ch,
ConnectionProperties connProps) {
+ final ConnectionProperties result = super.connectionSync(ch, connProps);
+ final String newCatalog = this.connProps.getCatalog();
+ if (newCatalog != null) {
+ try {
+ ((ArrowFlightConnection)
connection).getClientHandler().setCatalog(newCatalog);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
void setDefaultConnectionProperties() {
// TODO Double-check this.
connProps
@@ -268,7 +284,8 @@ public class ArrowFlightMetaImpl extends MetaImpl {
return statementHandlePreparedStatementMap.get(new
StatementHandleKey(statementHandle));
}
- // Helper used to look up prepared statement instances later. Avatica
doesn't give us the
+ // Helper used to look up prepared statement instances later. Avatica doesn't
+ // give us the
// signature in
// an UPDATE code path so we can't directly use StatementHandle as a map key.
private static final class StatementHandleKey {
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index 5dc7e0e2e..666996cd9 100644
---
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -47,7 +47,6 @@ import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.LocationSchemes;
-import org.apache.arrow.flight.SessionOptionValue;
import org.apache.arrow.flight.SessionOptionValueFactory;
import org.apache.arrow.flight.SetSessionOptionsRequest;
import org.apache.arrow.flight.SetSessionOptionsResult;
@@ -147,20 +146,26 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
try {
for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
if (endpoint.getLocations().isEmpty()) {
- // Create a stream using the current client only and do not close
the client at the end.
+ // Create a stream using the current client only and do not close
the client at
+ // the end.
endpoints.add(
new CloseableEndpointStreamPair(
sqlClient.getStream(endpoint.getTicket(), getOptions()),
null));
} else {
// Clone the builder and then set the new endpoint on it.
- // GH-38574: Currently a new FlightClient will be made for each
partition that returns a
- // non-empty Location then disposed of. It may be better to cache
clients because a server
- // may report the same Locations. It would also be good to identify
when the reported
+ // GH-38574: Currently a new FlightClient will be made for each
partition that
+ // returns a
+ // non-empty Location then disposed of. It may be better to cache
clients
+ // because a server
+ // may report the same Locations. It would also be good to identify
when the
+ // reported
// location
- // is the same as the original connection's Location and skip
creating a FlightClient in
+ // is the same as the original connection's Location and skip
creating a
+ // FlightClient in
// that scenario.
- // Also copy the cache to the client so we can share a cache. Cache
needs to cache
+ // Also copy the cache to the client so we can share a cache. Cache
needs to
+ // cache
// negative attempts too.
List<Exception> exceptions = new ArrayList<>();
CloseableEndpointStreamPair stream = null;
@@ -337,7 +342,8 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
*/
private void logSuppressedCloseException(
FlightRuntimeException fre, String operationDescription) {
- // ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer
during shutdown
+ // ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer
during
+ // shutdown
LOGGER.debug("Suppressed error {}", operationDescription, fre);
}
@@ -388,25 +394,40 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
/** A connection is created with catalog set as a session option. */
private void setSetCatalogInSessionIfPresent() {
if (catalog.isPresent()) {
- final SetSessionOptionsRequest setSessionOptionRequest =
- new SetSessionOptionsRequest(
- ImmutableMap.<String, SessionOptionValue>builder()
- .put(CATALOG,
SessionOptionValueFactory.makeSessionOptionValue(catalog.get()))
- .build());
- final SetSessionOptionsResult result =
- sqlClient.setSessionOptions(setSessionOptionRequest, getOptions());
+ try {
+ setCatalog(catalog.get());
+ } catch (SQLException e) {
+ throw CallStatus.INVALID_ARGUMENT
+ .withDescription(e.getMessage())
+ .withCause(e)
+ .toRuntimeException();
+ }
+ }
+ }
+ /**
+ * Sets the catalog for the current session.
+ *
+ * @param catalog the catalog to set.
+ * @throws SQLException if an error occurs while setting the catalog.
+ */
+ public void setCatalog(final String catalog) throws SQLException {
+ final SetSessionOptionsRequest request =
+ new SetSessionOptionsRequest(
+ ImmutableMap.of(CATALOG,
SessionOptionValueFactory.makeSessionOptionValue(catalog)));
+ try {
+ final SetSessionOptionsResult result =
sqlClient.setSessionOptions(request, getOptions());
if (result.hasErrors()) {
- Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
- for (Map.Entry<String, SetSessionOptionsResult.Error> error :
errors.entrySet()) {
+ final Map<String, SetSessionOptionsResult.Error> errors =
result.getErrors();
+ for (final Map.Entry<String, SetSessionOptionsResult.Error> error :
errors.entrySet()) {
LOGGER.warn(error.toString());
}
- throw CallStatus.INVALID_ARGUMENT
- .withDescription(
- String.format(
- "Cannot set session option for catalog = %s. Check log for
details.", catalog))
- .toRuntimeException();
+ throw new SQLException(
+ String.format(
+ "Cannot set session option for catalog = %s. Check log for
details.", catalog));
}
+ } catch (final FlightRuntimeException e) {
+ throw new SQLException(e);
}
}
@@ -654,7 +675,8 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
@VisibleForTesting @Nullable Duration connectTimeout;
- // These two middleware are for internal use within build() and should not
be exposed by builder
+ // These two middleware are for internal use within build() and should not
be
+ // exposed by builder
// APIs.
// Note that these middleware may not necessarily be registered.
@VisibleForTesting
@@ -980,7 +1002,8 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
* @throws SQLException on error.
*/
public ArrowFlightSqlClientHandler build() throws SQLException {
- // Copy middleware so that the build method doesn't change the state of
the builder fields
+ // Copy middleware so that the build method doesn't change the state of
the
+ // builder fields
// itself.
Set<FlightClientMiddleware.Factory> buildTimeMiddlewareFactories =
new HashSet<>(this.middlewareFactories);
@@ -988,7 +1011,8 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
boolean isUsingUserPasswordAuth = username != null && token == null;
try {
- // Token should take priority since some apps pass in a
username/password even when a token
+ // Token should take priority since some apps pass in a
username/password even
+ // when a token
// is provided
if (isUsingUserPasswordAuth) {
buildTimeMiddlewareFactories.add(authFactory);
@@ -1047,8 +1071,10 @@ public final class ArrowFlightSqlClientHandler
implements AutoCloseable {
allocator, channelBuilder.build(), clientBuilder.middleware());
final ArrayList<CallOption> credentialOptions = new ArrayList<>();
if (isUsingUserPasswordAuth) {
- // If the authFactory has already been used for a handshake, use the
existing token.
- // This can occur if the authFactory is being re-used for a new
connection spawned for
+ // If the authFactory has already been used for a handshake, use the
existing
+ // token.
+ // This can occur if the authFactory is being re-used for a new
connection
+ // spawned for
// getStream().
if (authFactory.getCredentialCallOption() != null) {
credentialOptions.add(authFactory.getCredentialCallOption());
diff --git
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
index 72e4b222a..46762f331 100644
---
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
+++
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -26,12 +27,15 @@ import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.Map;
import java.util.Properties;
import org.apache.arrow.driver.jdbc.authentication.UserPasswordAuthentication;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
import org.apache.arrow.flight.FlightMethod;
+import org.apache.arrow.flight.NoOpSessionOptionValueVisitor;
+import org.apache.arrow.flight.SessionOptionValue;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
@@ -614,7 +618,8 @@ public class ConnectionTest {
var expectedUserAgent =
"JDBC Flight SQL Driver " +
driverVersion.getDriverVersion().versionString;
- // Driver appends version to grpc user-agent header. Assert the header
starts with the
+ // Driver appends version to grpc user-agent header. Assert the header
starts
+ // with the
// expected
// value and ignored grpc version.
assertTrue(
@@ -622,4 +627,37 @@ public class ConnectionTest {
"Expected: " + expectedUserAgent + " but found: " + actualUserAgent);
}
}
+
+ @Test
+ public void testSetCatalogShouldUpdateSessionOptions() throws Exception {
+ final Properties properties = new Properties();
+ properties.put(ArrowFlightConnectionProperty.USER.camelName(), userTest);
+ properties.put(ArrowFlightConnectionProperty.PASSWORD.camelName(),
passTest);
+ properties.put("useEncryption", false);
+
+ try (Connection connection =
+ DriverManager.getConnection(
+ "jdbc:arrow-flight-sql://"
+ + FLIGHT_SERVER_TEST_EXTENSION.getHost()
+ + ":"
+ + FLIGHT_SERVER_TEST_EXTENSION.getPort(),
+ properties)) {
+ final String catalog = "new_catalog";
+ connection.setCatalog(catalog);
+
+ final Map<String, SessionOptionValue> options =
PRODUCER.getSessionOptions();
+ assertTrue(options.containsKey("catalog"));
+ String actualCatalog =
+ options
+ .get("catalog")
+ .acceptVisitor(
+ new NoOpSessionOptionValueVisitor<String>() {
+ @Override
+ public String visit(String value) {
+ return value;
+ }
+ });
+ assertEquals(catalog, actualCatalog);
+ }
+ }
}
diff --git
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
index a8874c486..45c2a9640 100644
---
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
+++
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
@@ -52,6 +52,9 @@ import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.SessionOptionValue;
+import org.apache.arrow.flight.SetSessionOptionsRequest;
+import org.apache.arrow.flight.SetSessionOptionsResult;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.flight.sql.SqlInfoBuilder;
@@ -664,6 +667,22 @@ public final class MockFlightSqlProducer implements
FlightSqlProducer {
return sqlInfoBuilder;
}
+ private final Map<String, SessionOptionValue> sessionOptions = new
HashMap<>();
+
+ @Override
+ public void setSessionOptions(
+ final SetSessionOptionsRequest request,
+ final CallContext context,
+ final StreamListener<SetSessionOptionsResult> listener) {
+ sessionOptions.putAll(request.getSessionOptions());
+ listener.onNext(new SetSessionOptionsResult(Collections.emptyMap()));
+ listener.onCompleted();
+ }
+
+ public Map<String, SessionOptionValue> getSessionOptions() {
+ return sessionOptions;
+ }
+
private static final class TicketConversionUtils {
private TicketConversionUtils() {
// Prevent instantiation.