This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new ef21008d4e GH-34284: [Java][FlightRPC] Fixed issue with prepared
statement getting sent twice (#34358)
ef21008d4e is described below
commit ef21008d4e253188092f3c54ad4f7e83aac8d79e
Author: rtadepalli <[email protected]>
AuthorDate: Tue Feb 28 08:06:29 2023 -0500
GH-34284: [Java][FlightRPC] Fixed issue with prepared statement getting
sent twice (#34358)
### Rationale for this change
The `AvaticaConnection` class within the Calcite library
[executes](https://github.com/apache/calcite-avatica/blob/b57eb7cd31a90d3f46b65c13832b398be5b0dad9/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java#L357)
the `prepareStatement(...)` function on a DB connection in the following order:
1. Calls `prepare()` on the `MetaImpl` class, which in our case is
`ArrowFlightMetaImpl`. This function executes a command and returns a prepared
statement.
2. Calls `newPreparedStatement` on the JDBC factory class, which in our
case is `ArrowFlightJdbcFactory`. This function today creates an entirely new
`PreparedStatement` without checking the statement cache, thereby causing 2
executions.
This PR fixes the issue by checking whether or not the `StatementHandle`
has been recorded before, and if so, uses the results of the previously
executed command. If not, it creates a brand new `PreparedStatement`.
My understanding is that `ConcurrentHashMap` (which is what is being used
to keep a mapping of statement handles to prepared statements) is eventually
consistent, so there _may_ be cases where the prepared statement execution runs
twice.
### What changes are included in this PR?
Check to see if the `PreparedStatement` has run before, and if so, use that
instead of executing the command to contact the server again.
### Are these changes tested?
Existing tests should cover this case. Namely,
`ArrowFlightPreparedStatementTest` I _think_ should cover all cases.
### Are there any user-facing changes?
There are no user facing changes.
* Closes: #34284
Authored-by: Ramasai <[email protected]>
Signed-off-by: David Li <[email protected]>
---
.../arrow/driver/jdbc/ArrowFlightJdbcFactory.java | 20 +++++++++++++++++---
.../driver/jdbc/ArrowFlightPreparedStatement.java | 12 ++++++++++++
.../jdbc/ArrowFlightPreparedStatementTest.java | 17 +++++++++++++++++
.../driver/jdbc/utils/MockFlightSqlProducer.java | 21 +++++++++++++++++++++
4 files changed, 67 insertions(+), 3 deletions(-)
diff --git
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
index a54fbb9511..216e4cd002 100644
---
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
+++
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java
@@ -17,12 +17,16 @@
package org.apache.arrow.driver.jdbc;
+import static
org.apache.arrow.driver.jdbc.utils.ConvertUtils.convertArrowFieldsToColumnMetaDataList;
+
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;
import java.util.TimeZone;
+import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.avatica.AvaticaResultSetMetaData;
@@ -81,9 +85,19 @@ public class ArrowFlightJdbcFactory implements
AvaticaFactory {
final int resultType,
final int resultSetConcurrency,
final int resultSetHoldability) throws SQLException {
- return ArrowFlightPreparedStatement.createNewPreparedStatement(
- (ArrowFlightConnection) connection, statementHandle, signature,
- resultType, resultSetConcurrency, resultSetHoldability);
+ final ArrowFlightConnection flightConnection = (ArrowFlightConnection)
connection;
+ ArrowFlightSqlClientHandler.PreparedStatement preparedStatement =
+ flightConnection.getMeta().getPreparedStatement(statementHandle);
+
+ if (preparedStatement == null) {
+ preparedStatement =
flightConnection.getClientHandler().prepare(signature.sql);
+ }
+ final Schema resultSetSchema = preparedStatement.getDataSetSchema();
+
signature.columns.addAll(convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));
+
+ return ArrowFlightPreparedStatement.newPreparedStatement(
+ flightConnection, preparedStatement, statementHandle,
+ signature, resultType, resultSetConcurrency, resultSetHoldability);
}
@Override
diff --git
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
index 80029f38f0..8784e39840 100644
---
a/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
+++
b/java/flight/flight-sql-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java
@@ -80,6 +80,18 @@ public class ArrowFlightPreparedStatement extends
AvaticaPreparedStatement
signature, resultSetType, resultSetConcurrency, resultSetHoldability);
}
+ static ArrowFlightPreparedStatement newPreparedStatement(final
ArrowFlightConnection connection,
+ final ArrowFlightSqlClientHandler.PreparedStatement preparedStmt,
+ final StatementHandle statementHandle,
+ final Signature signature,
+ final int resultSetType,
+ final int resultSetConcurrency,
+ final int resultSetHoldability) throws SQLException {
+ return new ArrowFlightPreparedStatement(
+ connection, preparedStmt, statementHandle,
+ signature, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
@Override
public ArrowFlightConnection getConnection() throws SQLException {
return (ArrowFlightConnection) super.getConnection();
diff --git
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
index 8af529296f..42fb31e811 100644
---
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
+++
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
@@ -27,7 +27,9 @@ import java.sql.SQLException;
import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
+import org.apache.arrow.flight.sql.FlightSqlUtils;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -56,6 +58,11 @@ public class ArrowFlightPreparedStatementTest {
connection.close();
}
+ @Before
+ public void before() {
+ PRODUCER.clearActionTypeCounter();
+ }
+
@Test
public void testSimpleQueryNoParameterBinding() throws SQLException {
final String query = CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD;
@@ -65,6 +72,16 @@ public class ArrowFlightPreparedStatementTest {
}
}
+ @Test
+ public void testPreparedStatementExecutionOnce() throws SQLException {
+ final PreparedStatement statement =
connection.prepareStatement(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD);
+ // Expect that there is one entry in the map -- {prepared statement action
type, invocation count}.
+ assertEquals(PRODUCER.getActionTypeCounter().size(), 1);
+ // Expect that the prepared statement was executed exactly once.
+
assertEquals(PRODUCER.getActionTypeCounter().get(FlightSqlUtils.FLIGHT_SQL_CREATE_PREPARED_STATEMENT.getType()),
1);
+ statement.close();
+ }
+
@Test
public void testReturnColumnCount() throws SQLException {
final String query = CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD;
diff --git
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
index cc8fae9722..0299eeb46d 100644
---
a/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
+++
b/java/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java
@@ -39,6 +39,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.IntStream;
+import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
@@ -98,6 +99,8 @@ public final class MockFlightSqlProducer implements
FlightSqlProducer {
new HashMap<>();
private SqlInfoBuilder sqlInfoBuilder = new SqlInfoBuilder();
+ private final Map<String, Integer> actionTypeCounter = new HashMap<>();
+
private static FlightInfo getFightInfoExportedAndImportedKeys(final Message
message,
final
FlightDescriptor descriptor) {
return getFlightInfo(message, Schemas.GET_IMPORTED_KEYS_SCHEMA,
descriptor);
@@ -502,6 +505,24 @@ public final class MockFlightSqlProducer implements
FlightSqlProducer {
throw CallStatus.UNIMPLEMENTED.toRuntimeException();
}
+ @Override
+ public void doAction(CallContext context, Action action,
StreamListener<Result> listener) {
+ FlightSqlProducer.super.doAction(context, action, listener);
+ actionTypeCounter.put(action.getType(),
actionTypeCounter.getOrDefault(action.getType(), 0) + 1);
+ }
+
+ /**
+ * Clear the `actionTypeCounter` map and restore to its default state.
Intended to be used in tests.
+ */
+ public void clearActionTypeCounter() {
+ actionTypeCounter.clear();
+ }
+
+ public Map<String, Integer> getActionTypeCounter() {
+ return actionTypeCounter;
+ }
+
+
private void getStreamCatalogFunctions(final Message ticket,
final ServerStreamListener
serverStreamListener) {
Preconditions.checkNotNull(