This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new cecd7710f3 GH-41947: [Java] Support catalog in JDBC driver with
session options (#42035)
cecd7710f3 is described below
commit cecd7710f31ae99758c58bcff42ec824b4291156
Author: Steve Lord <[email protected]>
AuthorDate: Thu Jun 13 16:40:48 2024 -0700
GH-41947: [Java] Support catalog in JDBC driver with session options
(#42035)
### Rationale for this change
See Issue https://github.com/apache/arrow/issues/41947
### What changes are included in this PR?
### Are these changes tested?
Yes
### Are there any user-facing changes?
Introductiona of an optional catalog query parameter in the JDBC url string.
* GitHub Issue: #41947
Authored-by: Steve Lord <[email protected]>
Signed-off-by: David Li <[email protected]>
---
java/flight/flight-sql-jdbc-core/pom.xml | 7 ++
.../arrow/driver/jdbc/ArrowFlightConnection.java | 2 +
.../jdbc/client/ArrowFlightSqlClientHandler.java | 74 ++++++++++++++++++++--
.../utils/ArrowFlightConnectionConfigImpl.java | 12 +++-
.../ArrowFlightSqlClientHandlerBuilderTest.java | 27 ++++++++
.../utils/ArrowFlightConnectionConfigImplTest.java | 7 ++
6 files changed, 123 insertions(+), 6 deletions(-)
diff --git a/java/flight/flight-sql-jdbc-core/pom.xml
b/java/flight/flight-sql-jdbc-core/pom.xml
index 7fe4e7f18c..7ea96b3e55 100644
--- a/java/flight/flight-sql-jdbc-core/pom.xml
+++ b/java/flight/flight-sql-jdbc-core/pom.xml
@@ -137,6 +137,13 @@ under the License.
<artifactId>bcpkix-jdk18on</artifactId>
<version>1.78.1</version>
</dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>3.0.2</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
index 24d72eb3f0..c1b1c8f8e6 100644
---
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
+++
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
@@ -112,6 +112,7 @@ public final class ArrowFlightConnection extends
AvaticaConnection {
.withCallOptions(config.toCallOption())
.withRetainCookies(config.retainCookies())
.withRetainAuth(config.retainAuth())
+ .withCatalog(config.getCatalog())
.build();
} catch (final SQLException e) {
try {
@@ -171,6 +172,7 @@ public final class ArrowFlightConnection extends
AvaticaConnection {
@Override
public void close() throws SQLException {
+ clientHandler.close();
if (executorService != null) {
executorService.shutdown();
}
diff --git
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index f3553ae2f0..845f5372d3 100644
---
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc.client;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
@@ -25,9 +26,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
import org.apache.arrow.flight.CallOption;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.CloseSessionRequest;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightClientMiddleware;
import org.apache.arrow.flight.FlightEndpoint;
@@ -36,6 +42,10 @@ import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.LocationSchemes;
+import org.apache.arrow.flight.SessionOptionValue;
+import org.apache.arrow.flight.SessionOptionValueFactory;
+import org.apache.arrow.flight.SetSessionOptionsRequest;
+import org.apache.arrow.flight.SetSessionOptionsResult;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler;
import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
@@ -57,19 +67,24 @@ import org.slf4j.LoggerFactory;
/** A {@link FlightSqlClient} handler. */
public final class ArrowFlightSqlClientHandler implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);
+ // JDBC connection string query parameter
+ private static final String CATALOG = "catalog";
private final FlightSqlClient sqlClient;
private final Set<CallOption> options = new HashSet<>();
private final Builder builder;
+ private final Optional<String> catalog;
ArrowFlightSqlClientHandler(
final FlightSqlClient sqlClient,
final Builder builder,
- final Collection<CallOption> credentialOptions) {
+ final Collection<CallOption> credentialOptions,
+ final Optional<String> catalog) {
this.options.addAll(builder.options);
this.options.addAll(credentialOptions);
this.sqlClient = Preconditions.checkNotNull(sqlClient);
this.builder = builder;
+ this.catalog = catalog;
}
/**
@@ -80,9 +95,15 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
* @param options the {@link CallOption}s to persist in between subsequent
client calls.
* @return a new {@link ArrowFlightSqlClientHandler}.
*/
- public static ArrowFlightSqlClientHandler createNewHandler(
- final FlightClient client, final Builder builder, final
Collection<CallOption> options) {
- return new ArrowFlightSqlClientHandler(new FlightSqlClient(client),
builder, options);
+ static ArrowFlightSqlClientHandler createNewHandler(
+ final FlightClient client,
+ final Builder builder,
+ final Collection<CallOption> options,
+ final Optional<String> catalog) {
+ final ArrowFlightSqlClientHandler handler =
+ new ArrowFlightSqlClientHandler(new FlightSqlClient(client), builder,
options, catalog);
+ handler.setSetCatalogInSessionIfPresent();
+ return handler;
}
/**
@@ -199,6 +220,9 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
@Override
public void close() throws SQLException {
+ if (catalog.isPresent()) {
+ sqlClient.closeSession(new CloseSessionRequest(), getOptions());
+ }
try {
AutoCloseables.close(sqlClient);
} catch (final Exception e) {
@@ -250,6 +274,31 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
void close();
}
+ /** A connection is created with catalog set as a session option. */
+ private void setSetCatalogInSessionIfPresent() {
+ if (catalog.isPresent()) {
+ final SetSessionOptionsRequest setSessionOptionRequest =
+ new SetSessionOptionsRequest(
+ ImmutableMap.<String, SessionOptionValue>builder()
+ .put(CATALOG,
SessionOptionValueFactory.makeSessionOptionValue(catalog.get()))
+ .build());
+ final SetSessionOptionsResult result =
+ sqlClient.setSessionOptions(setSessionOptionRequest, getOptions());
+
+ if (result.hasErrors()) {
+ Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
+ for (Map.Entry<String, SetSessionOptionsResult.Error> error :
errors.entrySet()) {
+ LOGGER.warn(error.toString());
+ }
+ throw CallStatus.INVALID_ARGUMENT
+ .withDescription(
+ String.format(
+ "Cannot set session option for catalog = %s. Check log for
details.", catalog))
+ .toRuntimeException();
+ }
+ }
+ }
+
/**
* Creates a new {@link PreparedStatement} for the given {@code query}.
*
@@ -492,6 +541,8 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
@VisibleForTesting boolean retainAuth = true;
+ @VisibleForTesting Optional<String> catalog = Optional.empty();
+
// These two middleware are for internal use within build() and should not
be exposed by builder
// APIs.
// Note that these middleware may not necessarily be registered.
@@ -527,6 +578,7 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
this.clientCertificatePath = original.clientCertificatePath;
this.clientKeyPath = original.clientKeyPath;
this.allocator = original.allocator;
+ this.catalog = original.catalog;
if (original.retainCookies) {
this.cookieFactory = original.cookieFactory;
@@ -762,6 +814,17 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
return this;
}
+ /**
+ * Sets the catalog for this handler if it is not null.
+ *
+ * @param catalog the catalog
+ * @return this instance.
+ */
+ public Builder withCatalog(@Nullable final String catalog) {
+ this.catalog = Optional.ofNullable(catalog);
+ return this;
+ }
+
/**
* Builds a new {@link ArrowFlightSqlClientHandler} from the provided
fields.
*
@@ -841,7 +904,8 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
new CredentialCallOption(new BearerCredentialWriter(token)),
options.toArray(new CallOption[0])));
}
- return ArrowFlightSqlClientHandler.createNewHandler(client, this,
credentialOptions);
+ return ArrowFlightSqlClientHandler.createNewHandler(
+ client, this, credentialOptions, catalog);
} catch (final IllegalArgumentException
| GeneralSecurityException
diff --git
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
index fcb53519da..e8bae2a207 100644
---
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
+++
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
@@ -154,6 +154,15 @@ public final class ArrowFlightConnectionConfigImpl extends
ConnectionConfigImpl
return ArrowFlightConnectionProperty.RETAIN_AUTH.getBoolean(properties);
}
+ /**
+ * The catalog to which a connection is made.
+ *
+ * @return the catalog.
+ */
+ public String getCatalog() {
+ return ArrowFlightConnectionProperty.CATALOG.getString(properties);
+ }
+
/**
* Gets the {@link CallOption}s from this {@link ConnectionConfig}.
*
@@ -203,7 +212,8 @@ public final class ArrowFlightConnectionConfigImpl extends
ConnectionConfigImpl
THREAD_POOL_SIZE("threadPoolSize", 1, Type.NUMBER, false),
TOKEN("token", null, Type.STRING, false),
RETAIN_COOKIES("retainCookies", true, Type.BOOLEAN, false),
- RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false);
+ RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false),
+ CATALOG("catalog", null, Type.STRING, false);
private final String camelName;
private final Object defaultValue;
diff --git
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
index 604bc4b700..2af2d0117b 100644
---
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
+++
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
@@ -16,12 +16,14 @@
*/
package org.apache.arrow.driver.jdbc.client;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import java.util.Optional;
import org.apache.arrow.driver.jdbc.FlightServerTestRule;
import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
import org.apache.arrow.memory.BufferAllocator;
@@ -142,5 +144,30 @@ public class ArrowFlightSqlClientHandlerBuilderTest {
assertNull(builder.tlsRootCertificatesPath);
assertNull(builder.clientCertificatePath);
assertNull(builder.clientKeyPath);
+ assertEquals(Optional.empty(), builder.catalog);
+ }
+
+ @Test
+ public void testCatalog() {
+ ArrowFlightSqlClientHandler.Builder rootBuilder = new
ArrowFlightSqlClientHandler.Builder();
+
+ rootBuilder.withCatalog(null);
+ assertFalse(rootBuilder.catalog.isPresent());
+
+ rootBuilder.withCatalog("");
+ assertTrue(rootBuilder.catalog.isPresent());
+
+ rootBuilder.withCatalog(" ");
+ assertTrue(rootBuilder.catalog.isPresent());
+
+ final String noSpaces = "noSpaces";
+ rootBuilder.withCatalog(noSpaces);
+ assertTrue(rootBuilder.catalog.isPresent());
+ assertEquals(noSpaces, rootBuilder.catalog.get());
+
+ final String nameWithSpaces = " spaces ";
+ rootBuilder.withCatalog(nameWithSpaces);
+ assertTrue(rootBuilder.catalog.isPresent());
+ assertEquals(nameWithSpaces, rootBuilder.catalog.get());
}
}
diff --git
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
index 616f6e4b36..56c5f0c755 100644
---
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
+++
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
@@ -18,6 +18,7 @@ package org.apache.arrow.driver.jdbc.utils;
import static java.lang.Runtime.getRuntime;
import static java.util.Arrays.asList;
+import static
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.CATALOG;
import static
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.HOST;
import static
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PASSWORD;
import static
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT;
@@ -107,6 +108,12 @@ public final class ArrowFlightConnectionConfigImplTest {
(Function<ArrowFlightConnectionConfigImpl, ?>)
ArrowFlightConnectionConfigImpl::threadPoolSize
},
+ {
+ CATALOG,
+ "catalog",
+ (Function<ArrowFlightConnectionConfigImpl, ?>)
+ ArrowFlightConnectionConfigImpl::getCatalog
+ },
});
}
}