This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ef21008d4e GH-34284: [Java][FlightRPC] Fixed issue with prepared 
statement getting sent twice (#34358)
ef21008d4e is described below

commit ef21008d4e253188092f3c54ad4f7e83aac8d79e
Author: rtadepalli <[email protected]>
AuthorDate: Tue Feb 28 08:06:29 2023 -0500

    GH-34284: [Java][FlightRPC] Fixed issue with prepared statement getting 
sent twice (#34358)
    
    ### Rationale for this change
    
    The `AvaticaConnection` class within the Calcite library 
[executes](https://github.com/apache/calcite-avatica/blob/b57eb7cd31a90d3f46b65c13832b398be5b0dad9/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java#L357)
 the `prepareStatement(...)` function on a DB connection in the following order:
    1. Calls `prepare()` on the `MetaImpl` class, which in our case is 
`ArrowFlightMetaImpl`. This function executes a command and returns a prepared 
statement.
    2. Calls `newPreparedStatement` on the JDBC factory class, which in our 
case is `ArrowFlightJdbcFactory`. This function today creates an entirely new 
`PreparedStatement` without checking the statement cache, thereby causing 2 
executions.
    
    This PR fixes the issue by checking whether or not the `StatementHandle` 
has been recorded before, and if so, uses the results of the previously 
executed command. If not, it creates a brand new `PreparedStatement`.
    
    My understanding is that `ConcurrentHashMap` (which is what is being used 
to keep a mapping of statement handles to prepared statements) is eventually 
consistent, so there _may_ be cases where the prepared statement execution runs 
twice.
    
    ### What changes are included in this PR?
    Check to see if the `PreparedStatement` has run before, and if so, use that 
instead of executing the command to contact the server again.
    
    ### Are these changes tested?
    
    Existing tests should cover this case. Namely, 
`ArrowFlightPreparedStatementTest` I _think_ should cover all cases.
    
    ### Are there any user-facing changes?
    There are no user facing changes.
    * Closes: #34284
    
    Authored-by: Ramasai <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 .../arrow/driver/jdbc/ArrowFlightJdbcFactory.java   | 20 +++++++++++++++++---
 .../driver/jdbc/ArrowFlightPreparedStatement.java   | 12 ++++++++++++
 .../jdbc/ArrowFlightPreparedStatementTest.java      | 17 +++++++++++++++++
 .../driver/jdbc/utils/MockFlightSqlProducer.java    | 21 +++++++++++++++++++++
 4 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
 
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
index a54fbb9511..216e4cd002 100644
--- 
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
+++ 
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
@@ -17,12 +17,16 @@
 
 package org.apache.arrow.driver.jdbc;
 
+import static 
org.apache.arrow.driver.jdbc.utils.ConvertUtils.convertArrowFieldsToColumnMetaDataList;
+
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.Properties;
 import java.util.TimeZone;
 
+import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.calcite.avatica.AvaticaConnection;
 import org.apache.calcite.avatica.AvaticaFactory;
 import org.apache.calcite.avatica.AvaticaResultSetMetaData;
@@ -81,9 +85,19 @@ public class ArrowFlightJdbcFactory implements 
AvaticaFactory {
       final int resultType,
       final int resultSetConcurrency,
       final int resultSetHoldability) throws SQLException {
-    return ArrowFlightPreparedStatement.createNewPreparedStatement(
-        (ArrowFlightConnection) connection, statementHandle, signature,
-        resultType, resultSetConcurrency, resultSetHoldability);
+    final ArrowFlightConnection flightConnection = (ArrowFlightConnection) 
connection;
+    ArrowFlightSqlClientHandler.PreparedStatement preparedStatement =
+        flightConnection.getMeta().getPreparedStatement(statementHandle);
+
+    if (preparedStatement == null) {
+      preparedStatement = 
flightConnection.getClientHandler().prepare(signature.sql);
+    }
+    final Schema resultSetSchema = preparedStatement.getDataSetSchema();
+    
signature.columns.addAll(convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));
+
+    return ArrowFlightPreparedStatement.newPreparedStatement(
+        flightConnection, preparedStatement, statementHandle,
+        signature, resultType, resultSetConcurrency, resultSetHoldability);
   }
 
   @Override
diff --git 
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
 
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
index 80029f38f0..8784e39840 100644
--- 
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
+++ 
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
@@ -80,6 +80,18 @@ public class ArrowFlightPreparedStatement extends 
AvaticaPreparedStatement
         signature, resultSetType, resultSetConcurrency, resultSetHoldability);
   }
 
+  static ArrowFlightPreparedStatement newPreparedStatement(final 
ArrowFlightConnection connection,
+      final ArrowFlightSqlClientHandler.PreparedStatement preparedStmt,
+      final StatementHandle statementHandle,
+      final Signature signature,
+      final int resultSetType,
+      final int resultSetConcurrency,
+      final int resultSetHoldability) throws SQLException {
+    return new ArrowFlightPreparedStatement(
+        connection, preparedStmt, statementHandle,
+        signature, resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
   @Override
   public ArrowFlightConnection getConnection() throws SQLException {
     return (ArrowFlightConnection) super.getConnection();
diff --git 
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
 
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
index 8af529296f..42fb31e811 100644
--- 
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
+++ 
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
@@ -27,7 +27,9 @@ import java.sql.SQLException;
 
 import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
 import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
+import org.apache.arrow.flight.sql.FlightSqlUtils;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -56,6 +58,11 @@ public class ArrowFlightPreparedStatementTest {
     connection.close();
   }
 
+  @Before
+  public void before() {
+    PRODUCER.clearActionTypeCounter();
+  }
+
   @Test
   public void testSimpleQueryNoParameterBinding() throws SQLException {
     final String query = CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD;
@@ -65,6 +72,16 @@ public class ArrowFlightPreparedStatementTest {
     }
   }
 
+  @Test
+  public void testPreparedStatementExecutionOnce() throws SQLException {
+    final PreparedStatement statement = 
connection.prepareStatement(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD);
+    // Expect that there is one entry in the map -- {prepared statement action 
type, invocation count}.
+    assertEquals(PRODUCER.getActionTypeCounter().size(), 1);
+    // Expect that the prepared statement was executed exactly once.
+    
assertEquals(PRODUCER.getActionTypeCounter().get(FlightSqlUtils.FLIGHT_SQL_CREATE_PREPARED_STATEMENT.getType()),
 1);
+    statement.close();
+  }
+
   @Test
   public void testReturnColumnCount() throws SQLException {
     final String query = CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD;
diff --git 
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
 
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
index cc8fae9722..0299eeb46d 100644
--- 
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
+++ 
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
@@ -39,6 +39,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
+import org.apache.arrow.flight.Action;
 import org.apache.arrow.flight.CallStatus;
 import org.apache.arrow.flight.Criteria;
 import org.apache.arrow.flight.FlightDescriptor;
@@ -98,6 +99,8 @@ public final class MockFlightSqlProducer implements 
FlightSqlProducer {
       new HashMap<>();
   private SqlInfoBuilder sqlInfoBuilder = new SqlInfoBuilder();
 
+  private final Map<String, Integer> actionTypeCounter = new HashMap<>();
+
   private static FlightInfo getFightInfoExportedAndImportedKeys(final Message 
message,
                                                                 final 
FlightDescriptor descriptor) {
     return getFlightInfo(message, Schemas.GET_IMPORTED_KEYS_SCHEMA, 
descriptor);
@@ -502,6 +505,24 @@ public final class MockFlightSqlProducer implements 
FlightSqlProducer {
     throw CallStatus.UNIMPLEMENTED.toRuntimeException();
   }
 
+  @Override
+  public void doAction(CallContext context, Action action, 
StreamListener<Result> listener) {
+    FlightSqlProducer.super.doAction(context, action, listener);
+    actionTypeCounter.put(action.getType(), 
actionTypeCounter.getOrDefault(action.getType(), 0) + 1);
+  }
+
+  /**
+   * Clear the `actionTypeCounter` map and restore to its default state. 
Intended to be used in tests.
+   */
+  public void clearActionTypeCounter() {
+    actionTypeCounter.clear();
+  }
+
+  public Map<String, Integer> getActionTypeCounter() {
+    return actionTypeCounter;
+  }
+
+
   private void getStreamCatalogFunctions(final Message ticket,
                                          final ServerStreamListener 
serverStreamListener) {
     Preconditions.checkNotNull(

Reply via email to