This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new cecd7710f3 GH-41947: [Java] Support catalog in JDBC driver with 
session options (#42035)
cecd7710f3 is described below

commit cecd7710f31ae99758c58bcff42ec824b4291156
Author: Steve Lord <[email protected]>
AuthorDate: Thu Jun 13 16:40:48 2024 -0700

    GH-41947: [Java] Support catalog in JDBC driver with session options 
(#42035)
    
    
    
    ### Rationale for this change
    
    See Issue https://github.com/apache/arrow/issues/41947
    
    ### What changes are included in this PR?
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    Introductiona of an optional catalog query parameter in the JDBC url string.
    
    * GitHub Issue: #41947
    
    Authored-by: Steve Lord <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 java/flight/flight-sql-jdbc-core/pom.xml           |  7 ++
 .../arrow/driver/jdbc/ArrowFlightConnection.java   |  2 +
 .../jdbc/client/ArrowFlightSqlClientHandler.java   | 74 ++++++++++++++++++++--
 .../utils/ArrowFlightConnectionConfigImpl.java     | 12 +++-
 .../ArrowFlightSqlClientHandlerBuilderTest.java    | 27 ++++++++
 .../utils/ArrowFlightConnectionConfigImplTest.java |  7 ++
 6 files changed, 123 insertions(+), 6 deletions(-)

diff --git a/java/flight/flight-sql-jdbc-core/pom.xml 
b/java/flight/flight-sql-jdbc-core/pom.xml
index 7fe4e7f18c..7ea96b3e55 100644
--- a/java/flight/flight-sql-jdbc-core/pom.xml
+++ b/java/flight/flight-sql-jdbc-core/pom.xml
@@ -137,6 +137,13 @@ under the License.
       <artifactId>bcpkix-jdk18on</artifactId>
       <version>1.78.1</version>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>3.0.2</version>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
 
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
index 24d72eb3f0..c1b1c8f8e6 100644
--- 
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
+++ 
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
@@ -112,6 +112,7 @@ public final class ArrowFlightConnection extends 
AvaticaConnection {
           .withCallOptions(config.toCallOption())
           .withRetainCookies(config.retainCookies())
           .withRetainAuth(config.retainAuth())
+          .withCatalog(config.getCatalog())
           .build();
     } catch (final SQLException e) {
       try {
@@ -171,6 +172,7 @@ public final class ArrowFlightConnection extends 
AvaticaConnection {
 
   @Override
   public void close() throws SQLException {
+    clientHandler.close();
     if (executorService != null) {
       executorService.shutdown();
     }
diff --git 
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
 
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index f3553ae2f0..845f5372d3 100644
--- 
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++ 
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.arrow.driver.jdbc.client;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.net.URI;
 import java.security.GeneralSecurityException;
@@ -25,9 +26,14 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
 import org.apache.arrow.flight.CallOption;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.CloseSessionRequest;
 import org.apache.arrow.flight.FlightClient;
 import org.apache.arrow.flight.FlightClientMiddleware;
 import org.apache.arrow.flight.FlightEndpoint;
@@ -36,6 +42,10 @@ import org.apache.arrow.flight.FlightRuntimeException;
 import org.apache.arrow.flight.FlightStatusCode;
 import org.apache.arrow.flight.Location;
 import org.apache.arrow.flight.LocationSchemes;
+import org.apache.arrow.flight.SessionOptionValue;
+import org.apache.arrow.flight.SessionOptionValueFactory;
+import org.apache.arrow.flight.SetSessionOptionsRequest;
+import org.apache.arrow.flight.SetSessionOptionsResult;
 import org.apache.arrow.flight.auth2.BearerCredentialWriter;
 import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler;
 import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
@@ -57,19 +67,24 @@ import org.slf4j.LoggerFactory;
 /** A {@link FlightSqlClient} handler. */
 public final class ArrowFlightSqlClientHandler implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);
+  // JDBC connection string query parameter
+  private static final String CATALOG = "catalog";
 
   private final FlightSqlClient sqlClient;
   private final Set<CallOption> options = new HashSet<>();
   private final Builder builder;
+  private final Optional<String> catalog;
 
   ArrowFlightSqlClientHandler(
       final FlightSqlClient sqlClient,
       final Builder builder,
-      final Collection<CallOption> credentialOptions) {
+      final Collection<CallOption> credentialOptions,
+      final Optional<String> catalog) {
     this.options.addAll(builder.options);
     this.options.addAll(credentialOptions);
     this.sqlClient = Preconditions.checkNotNull(sqlClient);
     this.builder = builder;
+    this.catalog = catalog;
   }
 
   /**
@@ -80,9 +95,15 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
    * @param options the {@link CallOption}s to persist in between subsequent 
client calls.
    * @return a new {@link ArrowFlightSqlClientHandler}.
    */
-  public static ArrowFlightSqlClientHandler createNewHandler(
-      final FlightClient client, final Builder builder, final 
Collection<CallOption> options) {
-    return new ArrowFlightSqlClientHandler(new FlightSqlClient(client), 
builder, options);
+  static ArrowFlightSqlClientHandler createNewHandler(
+      final FlightClient client,
+      final Builder builder,
+      final Collection<CallOption> options,
+      final Optional<String> catalog) {
+    final ArrowFlightSqlClientHandler handler =
+        new ArrowFlightSqlClientHandler(new FlightSqlClient(client), builder, 
options, catalog);
+    handler.setSetCatalogInSessionIfPresent();
+    return handler;
   }
 
   /**
@@ -199,6 +220,9 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
 
   @Override
   public void close() throws SQLException {
+    if (catalog.isPresent()) {
+      sqlClient.closeSession(new CloseSessionRequest(), getOptions());
+    }
     try {
       AutoCloseables.close(sqlClient);
     } catch (final Exception e) {
@@ -250,6 +274,31 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
     void close();
   }
 
+  /** A connection is created with catalog set as a session option. */
+  private void setSetCatalogInSessionIfPresent() {
+    if (catalog.isPresent()) {
+      final SetSessionOptionsRequest setSessionOptionRequest =
+          new SetSessionOptionsRequest(
+              ImmutableMap.<String, SessionOptionValue>builder()
+                  .put(CATALOG, 
SessionOptionValueFactory.makeSessionOptionValue(catalog.get()))
+                  .build());
+      final SetSessionOptionsResult result =
+          sqlClient.setSessionOptions(setSessionOptionRequest, getOptions());
+
+      if (result.hasErrors()) {
+        Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
+        for (Map.Entry<String, SetSessionOptionsResult.Error> error : 
errors.entrySet()) {
+          LOGGER.warn(error.toString());
+        }
+        throw CallStatus.INVALID_ARGUMENT
+            .withDescription(
+                String.format(
+                    "Cannot set session option for catalog = %s. Check log for 
details.", catalog))
+            .toRuntimeException();
+      }
+    }
+  }
+
   /**
    * Creates a new {@link PreparedStatement} for the given {@code query}.
    *
@@ -492,6 +541,8 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
 
     @VisibleForTesting boolean retainAuth = true;
 
+    @VisibleForTesting Optional<String> catalog = Optional.empty();
+
     // These two middleware are for internal use within build() and should not 
be exposed by builder
     // APIs.
     // Note that these middleware may not necessarily be registered.
@@ -527,6 +578,7 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
       this.clientCertificatePath = original.clientCertificatePath;
       this.clientKeyPath = original.clientKeyPath;
       this.allocator = original.allocator;
+      this.catalog = original.catalog;
 
       if (original.retainCookies) {
         this.cookieFactory = original.cookieFactory;
@@ -762,6 +814,17 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
       return this;
     }
 
+    /**
+     * Sets the catalog for this handler if it is not null.
+     *
+     * @param catalog the catalog
+     * @return this instance.
+     */
+    public Builder withCatalog(@Nullable final String catalog) {
+      this.catalog = Optional.ofNullable(catalog);
+      return this;
+    }
+
     /**
      * Builds a new {@link ArrowFlightSqlClientHandler} from the provided 
fields.
      *
@@ -841,7 +904,8 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
                   new CredentialCallOption(new BearerCredentialWriter(token)),
                   options.toArray(new CallOption[0])));
         }
-        return ArrowFlightSqlClientHandler.createNewHandler(client, this, 
credentialOptions);
+        return ArrowFlightSqlClientHandler.createNewHandler(
+            client, this, credentialOptions, catalog);
 
       } catch (final IllegalArgumentException
           | GeneralSecurityException
diff --git 
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
 
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
index fcb53519da..e8bae2a207 100644
--- 
a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
+++ 
b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
@@ -154,6 +154,15 @@ public final class ArrowFlightConnectionConfigImpl extends 
ConnectionConfigImpl
     return ArrowFlightConnectionProperty.RETAIN_AUTH.getBoolean(properties);
   }
 
+  /**
+   * The catalog to which a connection is made.
+   *
+   * @return the catalog.
+   */
+  public String getCatalog() {
+    return ArrowFlightConnectionProperty.CATALOG.getString(properties);
+  }
+
   /**
    * Gets the {@link CallOption}s from this {@link ConnectionConfig}.
    *
@@ -203,7 +212,8 @@ public final class ArrowFlightConnectionConfigImpl extends 
ConnectionConfigImpl
     THREAD_POOL_SIZE("threadPoolSize", 1, Type.NUMBER, false),
     TOKEN("token", null, Type.STRING, false),
     RETAIN_COOKIES("retainCookies", true, Type.BOOLEAN, false),
-    RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false);
+    RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false),
+    CATALOG("catalog", null, Type.STRING, false);
 
     private final String camelName;
     private final Object defaultValue;
diff --git 
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
 
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
index 604bc4b700..2af2d0117b 100644
--- 
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
+++ 
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
@@ -16,12 +16,14 @@
  */
 package org.apache.arrow.driver.jdbc.client;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Optional;
 import org.apache.arrow.driver.jdbc.FlightServerTestRule;
 import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
 import org.apache.arrow.memory.BufferAllocator;
@@ -142,5 +144,30 @@ public class ArrowFlightSqlClientHandlerBuilderTest {
     assertNull(builder.tlsRootCertificatesPath);
     assertNull(builder.clientCertificatePath);
     assertNull(builder.clientKeyPath);
+    assertEquals(Optional.empty(), builder.catalog);
+  }
+
+  @Test
+  public void testCatalog() {
+    ArrowFlightSqlClientHandler.Builder rootBuilder = new 
ArrowFlightSqlClientHandler.Builder();
+
+    rootBuilder.withCatalog(null);
+    assertFalse(rootBuilder.catalog.isPresent());
+
+    rootBuilder.withCatalog("");
+    assertTrue(rootBuilder.catalog.isPresent());
+
+    rootBuilder.withCatalog("   ");
+    assertTrue(rootBuilder.catalog.isPresent());
+
+    final String noSpaces = "noSpaces";
+    rootBuilder.withCatalog(noSpaces);
+    assertTrue(rootBuilder.catalog.isPresent());
+    assertEquals(noSpaces, rootBuilder.catalog.get());
+
+    final String nameWithSpaces = "  spaces ";
+    rootBuilder.withCatalog(nameWithSpaces);
+    assertTrue(rootBuilder.catalog.isPresent());
+    assertEquals(nameWithSpaces, rootBuilder.catalog.get());
   }
 }
diff --git 
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
 
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
index 616f6e4b36..56c5f0c755 100644
--- 
a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
+++ 
b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImplTest.java
@@ -18,6 +18,7 @@ package org.apache.arrow.driver.jdbc.utils;
 
 import static java.lang.Runtime.getRuntime;
 import static java.util.Arrays.asList;
+import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.CATALOG;
 import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.HOST;
 import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PASSWORD;
 import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT;
@@ -107,6 +108,12 @@ public final class ArrowFlightConnectionConfigImplTest {
             (Function<ArrowFlightConnectionConfigImpl, ?>)
                 ArrowFlightConnectionConfigImpl::threadPoolSize
           },
+          {
+            CATALOG,
+            "catalog",
+            (Function<ArrowFlightConnectionConfigImpl, ?>)
+                ArrowFlightConnectionConfigImpl::getCatalog
+          },
         });
   }
 }

Reply via email to