This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a8b155e9c6 Fixes for the Avatica JDBC driver (#12709)
a8b155e9c6 is described below

commit a8b155e9c6e4699d30ba2386c6ec88a3004eb819
Author: Paul Rogers <[email protected]>
AuthorDate: Wed Jul 27 15:22:40 2022 -0700

    Fixes for the Avatica JDBC driver (#12709)
    
    * Fixes for the Avatica JDBC driver
    
    Correctly implement regular and prepared statements
    Correctly implement result sets
    Fix race condition with contexts
    Clarify when parameters are used
    Prepare for single-pass through the planner
    
    * Addressed review comments
    
    * Addressed review comment
---
 .../security/ITBasicAuthConfigurationTest.java     |   2 +-
 .../security/ITBasicAuthLdapConfigurationTest.java |   2 +-
 .../java/org/apache/druid/sql/SqlLifecycle.java    |   2 +-
 .../java/org/apache/druid/sql/SqlQueryPlus.java    | 128 +++++
 .../sql/avatica/AbstractDruidJdbcStatement.java    | 257 +++++++++
 .../apache/druid/sql/avatica/DruidConnection.java  | 130 ++++-
 .../sql/avatica/DruidJdbcPreparedStatement.java    | 160 ++++++
 .../druid/sql/avatica/DruidJdbcResultSet.java      | 250 ++++++++
 .../druid/sql/avatica/DruidJdbcStatement.java      |  79 +++
 .../org/apache/druid/sql/avatica/DruidMeta.java    | 188 +++---
 .../apache/druid/sql/avatica/DruidStatement.java   | 449 ---------------
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java | 635 +++++++++++++--------
 .../sql/avatica/DruidAvaticaJsonHandlerTest.java   |  47 --
 .../druid/sql/avatica/DruidStatementTest.java      | 554 +++++++++++++++---
 14 files changed, 1925 insertions(+), 958 deletions(-)

diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
index 562ca66de0..7acf915954 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
@@ -48,7 +48,7 @@ public class ITBasicAuthConfigurationTest extends 
AbstractAuthConfigurationTest
   private static final String BASIC_AUTHORIZER = "basic";
 
   private static final String EXPECTED_AVATICA_AUTH_ERROR = "Error while 
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver 
error: QueryInterruptedException: User metadata store authentication failed. -> 
BasicSecurityAuthenticationException: User metadata store authentication 
failed.";
-  private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while 
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver 
error: RuntimeException: org.apache.druid.server.security.ForbiddenException: 
Allowed:false, Message: -> ForbiddenException: Allowed:false, Message:";
+  private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while 
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver 
error: ForbiddenException: Allowed:false, Message:";
 
   private HttpClient druid99;
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
index a3e7291bb2..f174cdf8fa 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
@@ -54,7 +54,7 @@ public class ITBasicAuthLdapConfigurationTest extends 
AbstractAuthConfigurationT
   private static final String LDAP_AUTHORIZER = "ldapauth";
 
   private static final String EXPECTED_AVATICA_AUTH_ERROR = "Error while 
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver 
error: QueryInterruptedException: User LDAP authentication failed. -> 
BasicSecurityAuthenticationException: User LDAP authentication failed.";
-  private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while 
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver 
error: RuntimeException: org.apache.druid.server.security.ForbiddenException: 
Allowed:false, Message: -> ForbiddenException: Allowed:false, Message:";
+  private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while 
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver 
error: ForbiddenException: Allowed:false, Message:";
 
   @Inject
   IntegrationTestingConfig config;
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java 
b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index 9faca5c600..002fd3045a 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -170,7 +170,7 @@ public class SqlLifecycle
   }
 
   /**
-   * Assign dynamic parameters to be used to substitute values during query 
exection. This can be performed at any
+   * Assign dynamic parameters to be used to substitute values during query 
execution. This can be performed at any
    * part of the lifecycle.
    */
   public void setParameters(List<TypedValue> parameters)
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java 
b/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java
new file mode 100644
index 0000000000..bebf74b1a3
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.avatica.remote.TypedValue;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.http.SqlParameter;
+import org.apache.druid.sql.http.SqlQuery;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Captures the inputs to a SQL execution request: the statement,
+ * the context, parameters, and the authorization result. Pass this
+ * around rather than the quad of items. The request can evolve:
+ * items can be filled in later as needed (except for the SQL
+ * and auth result, which are required.)
+ */
+public class SqlQueryPlus
+{
+  private final String sql;
+  private final QueryContext queryContext;
+  private final List<TypedValue> parameters;
+  private final AuthenticationResult authResult;
+
+  public SqlQueryPlus(
+      String sql,
+      QueryContext queryContext,
+      List<TypedValue> parameters,
+      AuthenticationResult authResult
+  )
+  {
+    this.sql = Preconditions.checkNotNull(sql);
+    this.queryContext = queryContext == null
+        ? new QueryContext()
+        : queryContext;
+    this.parameters = parameters == null
+        ? Collections.emptyList()
+        : parameters;
+    this.authResult = Preconditions.checkNotNull(authResult);
+  }
+
+  public SqlQueryPlus(final String sql, final AuthenticationResult authResult)
+  {
+    this(sql, (QueryContext) null, null, authResult);
+  }
+
+  public static SqlQueryPlus fromSqlParameters(
+      String sql,
+      Map<String, Object> queryContext,
+      List<SqlParameter> parameters,
+      AuthenticationResult authResult
+  )
+  {
+    return new SqlQueryPlus(
+        sql,
+        queryContext == null ? null : new QueryContext(queryContext),
+        parameters == null ? null : SqlQuery.getParameterList(parameters),
+        authResult
+     );
+  }
+
+  public static SqlQueryPlus from(
+      String sql,
+      Map<String, Object> queryContext,
+      List<TypedValue> parameters,
+      AuthenticationResult authResult
+  )
+  {
+    return new SqlQueryPlus(
+        sql,
+        queryContext == null ? null : new QueryContext(queryContext),
+        parameters,
+        authResult
+    );
+  }
+
+  public String sql()
+  {
+    return sql;
+  }
+
+  public QueryContext context()
+  {
+    return queryContext;
+  }
+
+  public List<TypedValue> parameters()
+  {
+    return parameters;
+  }
+
+  public AuthenticationResult authResult()
+  {
+    return authResult;
+  }
+
+  public SqlQueryPlus withContext(QueryContext context)
+  {
+    return new SqlQueryPlus(sql, context, parameters, authResult);
+  }
+
+  public SqlQueryPlus withParameters(List<TypedValue> parameters)
+  {
+    return new SqlQueryPlus(sql, queryContext, parameters, authResult);
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
 
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
new file mode 100644
index 0000000000..399ecf673f
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PrepareResult;
+
+import java.io.Closeable;
+import java.sql.Array;
+import java.sql.DatabaseMetaData;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Common implementation for the JDBC {@code Statement} and
+ * {@code PreparedStatement} implementations in Druid. Statement use
+ * {@link DruidJdbcResultSet} objects to iterate through rows: zero
+ * or one may be open at any time, and a single statement supports
+ * multiple result sets concurrently. Druid closes the result set after
+ * the last batch in compliance with this note on page 137 of the
+ * <a 
href="https://download.oracle.com/otn-pub/jcp/jdbc-4_1-mrel-spec/jdbc4.1-fr-spec.pdf?AuthParam=1655858260_d6388aeec9aeb8e5616d221749b1ff71";>
+ * JDBC 4.1 specification</a>:
+ * <p>
+ * <i>Some JDBC driver implementations may also implicitly close the
+ * ResultSet when the ResultSet type is TYPE_FORWARD_ONLY and the next
+ * method of ResultSet returns false.</i>
+ */
+public abstract class AbstractDruidJdbcStatement implements Closeable
+{
+  public static final long START_OFFSET = 0;
+
+  protected final DruidConnection connection;
+  protected final int statementId;
+  protected DruidJdbcResultSet resultSet;
+
+  public AbstractDruidJdbcStatement(
+      final DruidConnection connection,
+      final int statementId
+  )
+  {
+    this.connection = Preconditions.checkNotNull(connection, "connection");
+    this.statementId = statementId;
+  }
+
+  protected static Meta.Signature createSignature(PrepareResult prepareResult, 
String sql)
+  {
+    List<AvaticaParameter> params = new ArrayList<>();
+    final RelDataType parameterRowType = prepareResult.getParameterRowType();
+    for (RelDataTypeField field : parameterRowType.getFieldList()) {
+      RelDataType type = field.getType();
+      params.add(createParameter(field, type));
+    }
+    return Meta.Signature.create(
+        createColumnMetaData(prepareResult.getRowType()),
+        sql,
+        params,
+        Meta.CursorFactory.ARRAY,
+        Meta.StatementType.SELECT // We only support SELECT
+    );
+  }
+
+  private static AvaticaParameter createParameter(RelDataTypeField field, 
RelDataType type)
+  {
+    // signed is always false because no way to extract from RelDataType, and 
the only usage of this AvaticaParameter
+    // constructor I can find, in CalcitePrepareImpl, does it this way with 
hard coded false
+    return new AvaticaParameter(
+        false,
+        type.getPrecision(),
+        type.getScale(),
+        type.getSqlTypeName().getJdbcOrdinal(),
+        type.getSqlTypeName().getName(),
+        Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(),
+        field.getName()
+    );
+  }
+
+  public static List<ColumnMetaData> createColumnMetaData(final RelDataType 
rowType)
+  {
+    final List<ColumnMetaData> columns = new ArrayList<>();
+    List<RelDataTypeField> fieldList = rowType.getFieldList();
+
+    for (int i = 0; i < fieldList.size(); i++) {
+      RelDataTypeField field = fieldList.get(i);
+
+      final ColumnMetaData.AvaticaType columnType;
+      if (field.getType().getSqlTypeName() == SqlTypeName.ARRAY) {
+        final ColumnMetaData.Rep elementRep = 
rep(field.getType().getComponentType().getSqlTypeName());
+        final ColumnMetaData.ScalarType elementType = ColumnMetaData.scalar(
+            
field.getType().getComponentType().getSqlTypeName().getJdbcOrdinal(),
+            field.getType().getComponentType().getSqlTypeName().getName(),
+            elementRep
+        );
+        final ColumnMetaData.Rep arrayRep = 
rep(field.getType().getSqlTypeName());
+        columnType = ColumnMetaData.array(
+            elementType,
+            field.getType().getSqlTypeName().getName(),
+            arrayRep
+        );
+      } else {
+        final ColumnMetaData.Rep rep = rep(field.getType().getSqlTypeName());
+        columnType = ColumnMetaData.scalar(
+            field.getType().getSqlTypeName().getJdbcOrdinal(),
+            field.getType().getSqlTypeName().getName(),
+            rep
+        );
+      }
+      columns.add(
+          new ColumnMetaData(
+              i, // ordinal
+              false, // auto increment
+              true, // case sensitive
+              false, // searchable
+              false, // currency
+              field.getType().isNullable()
+              ? DatabaseMetaData.columnNullable
+              : DatabaseMetaData.columnNoNulls, // nullable
+              true, // signed
+              field.getType().getPrecision(), // display size
+              field.getName(), // label
+              null, // column name
+              null, // schema name
+              field.getType().getPrecision(), // precision
+              field.getType().getScale(), // scale
+              null, // table name
+              null, // catalog name
+              columnType, // avatica type
+              true, // read only
+              false, // writable
+              false, // definitely writable
+              columnType.columnClassName() // column class name
+          )
+      );
+    }
+
+    return columns;
+  }
+
+  private static ColumnMetaData.Rep rep(final SqlTypeName sqlType)
+  {
+    if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
+      return ColumnMetaData.Rep.of(String.class);
+    } else if (sqlType == SqlTypeName.TIMESTAMP) {
+      return ColumnMetaData.Rep.of(Long.class);
+    } else if (sqlType == SqlTypeName.DATE) {
+      return ColumnMetaData.Rep.of(Integer.class);
+    } else if (sqlType == SqlTypeName.INTEGER) {
+      // use Number.class for exact numeric types since JSON transport might 
switch longs to integers
+      return ColumnMetaData.Rep.of(Number.class);
+    } else if (sqlType == SqlTypeName.BIGINT) {
+      // use Number.class for exact numeric types since JSON transport might 
switch longs to integers
+      return ColumnMetaData.Rep.of(Number.class);
+    } else if (sqlType == SqlTypeName.FLOAT) {
+      return ColumnMetaData.Rep.of(Float.class);
+    } else if (sqlType == SqlTypeName.DOUBLE || sqlType == 
SqlTypeName.DECIMAL) {
+      return ColumnMetaData.Rep.of(Double.class);
+    } else if (sqlType == SqlTypeName.BOOLEAN) {
+      return ColumnMetaData.Rep.of(Boolean.class);
+    } else if (sqlType == SqlTypeName.OTHER) {
+      return ColumnMetaData.Rep.of(Object.class);
+    } else if (sqlType == SqlTypeName.ARRAY) {
+      return ColumnMetaData.Rep.of(Array.class);
+    } else {
+      throw new ISE("No rep for SQL type [%s]", sqlType);
+    }
+  }
+
+  public Meta.Frame nextFrame(final long fetchOffset, final int 
fetchMaxRowCount)
+  {
+    Meta.Frame frame = requireResultSet().nextFrame(fetchOffset, 
fetchMaxRowCount);
+
+    // Implicitly close after the last result frame.
+    if (frame.done) {
+      closeResultSet();
+    }
+    return frame;
+  }
+
+  public abstract Meta.Signature getSignature();
+
+  public void closeResultSet()
+  {
+    // Lock held only to get the result set, not during cleanup.
+    DruidJdbcResultSet currentResultSet;
+    synchronized (this) {
+      currentResultSet = resultSet;
+      resultSet = null;
+    }
+    if (currentResultSet != null) {
+      currentResultSet.close();
+    }
+  }
+
+  protected synchronized DruidJdbcResultSet requireResultSet()
+  {
+    if (resultSet == null) {
+      throw new ISE("No result set open for statement [%d]", statementId);
+    }
+    return resultSet;
+  }
+
+  public long getCurrentOffset()
+  {
+    return requireResultSet().getCurrentOffset();
+  }
+
+  public synchronized boolean isDone()
+  {
+    return resultSet == null ? true : resultSet.isDone();
+  }
+
+  @Override
+  public synchronized void close()
+  {
+    closeResultSet();
+  }
+
+  public String getConnectionId()
+  {
+    return connection.getConnectionId();
+  }
+
+  public int getStatementId()
+  {
+    return statementId;
+  }
+
+  public ExecutorService executor()
+  {
+    return connection.executor();
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
index 2cd277f1c5..ab6ae65a98 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
@@ -20,17 +20,22 @@
 package org.apache.druid.sql.avatica;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.calcite.tools.RelConversionException;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -44,15 +49,14 @@ public class DruidConnection
 
   private final String connectionId;
   private final int maxStatements;
-  private final ImmutableMap<String, Object> userSecret;
-  private final QueryContext context;
+  private final Map<String, Object> userSecret;
+  private final Map<String, Object> context;
   private final AtomicInteger statementCounter = new AtomicInteger();
   private final AtomicReference<Future<?>> timeoutFuture = new 
AtomicReference<>();
+  private final ExecutorService yielderOpenCloseExecutor;
 
-  // Typically synchronized by connectionLock, except in one case: the onClose 
function passed
-  // into DruidStatements contained by the map.
   @GuardedBy("connectionLock")
-  private final ConcurrentMap<Integer, DruidStatement> statements;
+  private final ConcurrentMap<Integer, AbstractDruidJdbcStatement> statements 
= new ConcurrentHashMap<>();
   private final Object connectionLock = new Object();
 
   @GuardedBy("connectionLock")
@@ -62,14 +66,19 @@ public class DruidConnection
       final String connectionId,
       final int maxStatements,
       final Map<String, Object> userSecret,
-      final QueryContext context
+      final Map<String, Object> context
   )
   {
     this.connectionId = Preconditions.checkNotNull(connectionId);
     this.maxStatements = maxStatements;
     this.userSecret = ImmutableMap.copyOf(userSecret);
-    this.context = context;
-    this.statements = new ConcurrentHashMap<>();
+    this.context = Preconditions.checkNotNull(context);
+    this.yielderOpenCloseExecutor = Execs.singleThreaded(
+        StringUtils.format(
+            "JDBCYielderOpenCloseExecutor-connection-%s",
+            StringUtils.encodeForFormat(connectionId)
+        )
+    );
   }
 
   public String getConnectionId()
@@ -77,7 +86,16 @@ public class DruidConnection
     return connectionId;
   }
 
-  public DruidStatement createStatement(SqlLifecycleFactory 
sqlLifecycleFactory)
+  public QueryContext makeContext()
+  {
+    // QueryContext constructor copies the context parameters.
+    // we don't want to stringify arrays for JDBC ever because Avatica needs 
to handle this
+    final QueryContext queryContext = new QueryContext(context);
+    queryContext.addSystemParam(PlannerContext.CTX_SQL_STRINGIFY_ARRAYS, 
false);
+    return queryContext;
+  }
+
+  public DruidJdbcStatement createStatement(SqlLifecycleFactory 
sqlLifecycleFactory)
   {
     final int statementId = statementCounter.incrementAndGet();
 
@@ -89,36 +107,85 @@ public class DruidConnection
       }
 
       if (statements.size() >= maxStatements) {
-        throw DruidMeta.logFailure(new ISE("Too many open statements, limit 
is[%,d]", maxStatements));
+        throw DruidMeta.logFailure(new ISE("Too many open statements, limit is 
[%,d]", maxStatements));
       }
 
       @SuppressWarnings("GuardedBy")
-      final DruidStatement statement = new DruidStatement(
-          connectionId,
+      final DruidJdbcStatement statement = new DruidJdbcStatement(
+          this,
           statementId,
-          context,
-          sqlLifecycleFactory.factorize(),
-          () -> {
-            // onClose function for the statement
-            LOG.debug("Connection[%s] closed statement[%s].", connectionId, 
statementId);
-            // statements will be accessed unsynchronized to avoid deadlock
-            statements.remove(statementId);
-          }
+          sqlLifecycleFactory
       );
 
       statements.put(statementId, statement);
-      LOG.debug("Connection[%s] opened statement[%s].", connectionId, 
statementId);
+      LOG.debug("Connection [%s] opened statement [%s].", connectionId, 
statementId);
       return statement;
     }
   }
 
-  public DruidStatement getStatement(final int statementId)
+  public DruidJdbcPreparedStatement createPreparedStatement(
+      SqlLifecycleFactory sqlLifecycleFactory,
+      SqlQueryPlus queryPlus,
+      final long maxRowCount)
+  {
+    final int statementId = statementCounter.incrementAndGet();
+
+    synchronized (connectionLock) {
+      if (statements.containsKey(statementId)) {
+        // Will only happen if statementCounter rolls over before old 
statements are cleaned up. If this
+        // ever happens then something fishy is going on, because we shouldn't 
have billions of statements.
+        throw DruidMeta.logFailure(new ISE("Uh oh, too many statements"));
+      }
+
+      if (statements.size() >= maxStatements) {
+        throw DruidMeta.logFailure(new ISE("Too many open statements, limit is 
[%,d]", maxStatements));
+      }
+
+      @SuppressWarnings("GuardedBy")
+      final DruidJdbcPreparedStatement jdbcStmt = new 
DruidJdbcPreparedStatement(
+          this,
+          statementId,
+          queryPlus,
+          sqlLifecycleFactory,
+          maxRowCount
+      );
+      jdbcStmt.prepare();
+
+      statements.put(statementId, jdbcStmt);
+      LOG.debug("Connection [%s] opened prepared statement [%s].", 
connectionId, statementId);
+      return jdbcStmt;
+    }
+  }
+
+  public void prepareAndExecute(
+      final DruidJdbcStatement druidStatement,
+      final SqlQueryPlus queryPlus,
+      final long maxRowCount
+  ) throws RelConversionException
+  {
+    Preconditions.checkNotNull(context, "JDBC connection context is null!");
+    druidStatement.execute(queryPlus.withContext(makeContext()), maxRowCount);
+  }
+
+  public AbstractDruidJdbcStatement getStatement(final int statementId)
   {
     synchronized (connectionLock) {
       return statements.get(statementId);
     }
   }
 
+  public void closeStatement(int statementId)
+  {
+    AbstractDruidJdbcStatement stmt;
+    synchronized (connectionLock) {
+      stmt = statements.remove(statementId);
+    }
+    if (stmt != null) {
+      stmt.close();
+      LOG.debug("Connection [%s] closed statement [%s].", connectionId, 
statementId);
+    }
+  }
+
   /**
    * Closes this connection if it has no statements.
    *
@@ -139,18 +206,18 @@ public class DruidConnection
   public void close()
   {
     synchronized (connectionLock) {
-      // Copy statements before iterating because statement.close() modifies 
it.
-      for (DruidStatement statement : 
ImmutableList.copyOf(statements.values())) {
+      open = false;
+      for (AbstractDruidJdbcStatement statement : statements.values()) {
         try {
           statement.close();
         }
         catch (Exception e) {
-          LOG.warn("Connection[%s] failed to close statement[%s]!", 
connectionId, statement.getStatementId());
+          LOG.warn("Connection [%s] failed to close statement [%s]!", 
connectionId, statement.getStatementId());
         }
       }
-
-      LOG.debug("Connection[%s] closed.", connectionId);
-      open = false;
+      statements.clear();
+      yielderOpenCloseExecutor.shutdownNow();
+      LOG.debug("Connection [%s] closed.", connectionId);
     }
   }
 
@@ -167,4 +234,9 @@ public class DruidConnection
   {
     return userSecret;
   }
+
+  public ExecutorService executor()
+  {
+    return yielderOpenCloseExecutor;
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
new file mode 100644
index 0000000000..5f23b8ee4d
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.remote.TypedValue;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
+import org.apache.druid.sql.calcite.planner.PrepareResult;
+
+import java.util.List;
+
+/**
+ * The Druid implementation of the server-side representation of the
+ * JDBC {@code PreparedStatement} class. A prepared statement can be prepared
+ * once with a query, then executed any number of times, typically with
+ * parameter values. No parameters are provided during prepare, though we do
+ * learn the parameter definitions passed back to the client and used by
+ * Avatica for serialization. Each execution produces a
+ * {@link DruidJdbcResultSet}. Only one execution is active at a time.
+ */
+public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement
+{
+  private final SqlLifecycle sqlStatement;
+  private final SqlQueryPlus queryPlus;
+  private final SqlLifecycleFactory lifecycleFactory;
+  private final long maxRowCount;
+  private Meta.Signature signature;
+  private State state = State.NEW;
+
+  public DruidJdbcPreparedStatement(
+      final DruidConnection connection,
+      final int statementId,
+      final SqlQueryPlus queryPlus,
+      final SqlLifecycleFactory lifecycleFactory,
+      final long maxRowCount
+  )
+  {
+    super(connection, statementId);
+    this.lifecycleFactory = lifecycleFactory;
+    this.queryPlus = queryPlus;
+    this.maxRowCount = maxRowCount;
+    this.sqlStatement = lifecycleFactory.factorize();
+    sqlStatement.initialize(queryPlus.sql(), connection.makeContext());
+  }
+
+  public synchronized void prepare()
+  {
+    try {
+      ensure(State.NEW);
+      sqlStatement.validateAndAuthorize(queryPlus.authResult());
+      PrepareResult prepareResult = sqlStatement.prepare();
+      signature = createSignature(
+          prepareResult,
+          queryPlus.sql()
+      );
+      state = State.PREPARED;
+    }
+    catch (ForbiddenException e) {
+      // Can't finalize statement in in this case. Call will fail with an
+      // assertion error.
+      DruidMeta.logFailure(e);
+      state = State.CLOSED;
+      throw e;
+    }
+    catch (RuntimeException e) {
+      failed(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      failed(t);
+      throw new RuntimeException(t);
+    }
+  }
+
+  @Override
+  public synchronized Meta.Signature getSignature()
+  {
+    ensure(State.PREPARED);
+    return signature;
+  }
+
+  public synchronized void execute(List<TypedValue> parameters)
+  {
+    ensure(State.PREPARED);
+    closeResultSet();
+    try {
+      SqlLifecycle directStmt = lifecycleFactory.factorize();
+      directStmt.initialize(queryPlus.sql(), connection.makeContext());
+      directStmt.setParameters(parameters);
+      resultSet = new DruidJdbcResultSet(this, queryPlus, directStmt, 
maxRowCount);
+      resultSet.execute();
+    }
+    // Failure to execute does not close the prepared statement.
+    catch (RuntimeException e) {
+      failed(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      failed(t);
+      throw new RuntimeException(t);
+    }
+  }
+
+  @GuardedBy("this")
+  private void ensure(final State... desiredStates)
+  {
+    for (State desiredState : desiredStates) {
+      if (state == desiredState) {
+        return;
+      }
+    }
+    throw new ISE("Invalid action for state [%s]", state);
+  }
+
+  private void failed(Throwable t)
+  {
+    super.close();
+    sqlStatement.finalizeStateAndEmitLogsAndMetrics(t, null, -1);
+    state = State.CLOSED;
+  }
+
+  @Override
+  public synchronized void close()
+  {
+    if (state != State.CLOSED) {
+      super.close();
+      sqlStatement.finalizeStateAndEmitLogsAndMetrics(null, null, -1);
+    }
+    state = State.CLOSED;
+  }
+
+  enum State
+  {
+    NEW,
+    PREPARED,
+    CLOSED
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
new file mode 100644
index 0000000000..d4c3eba1d0
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlQueryPlus;
+import org.apache.druid.sql.calcite.planner.PrepareResult;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Druid's server-side representation of a JDBC result set. At most one
+ * can be open per statement (standard or prepared). The implementation
+ * is based on Druid's {@link SqlLifecycle} class. Even if result
+ * set is for a {@code PreparedStatement}, the result set itself uses
+ * a Druid {@code SqlLifecycle} which includes the parameter values
+ * given for the execution. This allows Druid's planner to use the "query
+ * optimized" form of parameter substitution: we replan the query for
+ * each execution with the parameter values.
+ * <p>
+ * Avatica returns results in {@link Meta.Frame} objects as batches of
+ * rows. The result set uses the {@code TYPE_FORWARD_ONLY} execution model:
+ * the application can only read results sequentially, the application
+ * can't jump around or read backwards. As a result, the enclosing
+ * statement closes the result set at EOF to release resources early.
+ */
+public class DruidJdbcResultSet implements Closeable
+{
+  /**
+   * Query metrics can only be used within a single thread. Because results can
+   * be paginated into multiple JDBC frames (each frame being processed by a
+   * potentially different thread), the thread that closes the yielder
+   * (resulting in a QueryMetrics emit() call) may not be the same thread that
+   * created the yielder (which initializes DefaultQueryMetrics with the 
current
+   * thread as the owner). Create and close the yielder with this single-thread
+   * executor to prevent this from happening.
+   * <p>
+   * The thread owner check in DefaultQueryMetrics is more aggressive than
+   * needed for this specific JDBC case, since the JDBC frames are processed
+   * sequentially. If the thread owner check is changed/loosened to permit this
+   * use case, we would not need to use this executor.
+   * <p>
+   * See discussion at:
+   * https://github.com/apache/druid/pull/4288
+   * https://github.com/apache/druid/pull/4415
+   */
+  private final AbstractDruidJdbcStatement jdbcStatement;
+  private final SqlQueryPlus sqlRequest;
+  private final SqlLifecycle stmt;
+  private final long maxRowCount;
+  private State state = State.NEW;
+  private Meta.Signature signature;
+  private Yielder<Object[]> yielder;
+  private int offset;
+
+  public DruidJdbcResultSet(
+      final AbstractDruidJdbcStatement jdbcStatement,
+      final SqlQueryPlus sqlRequest,
+      final SqlLifecycle stmt,
+      final long maxRowCount
+  )
+  {
+    this.jdbcStatement = jdbcStatement;
+    this.stmt = stmt;
+    this.sqlRequest = sqlRequest;
+    this.maxRowCount = maxRowCount;
+  }
+
+  public synchronized void execute() throws RelConversionException
+  {
+    ensure(State.NEW);
+    stmt.validateAndAuthorize(sqlRequest.authResult());
+    PrepareResult prepareResult = stmt.prepare();
+    stmt.plan();
+    signature = AbstractDruidJdbcStatement.createSignature(
+        prepareResult,
+        sqlRequest.sql()
+    );
+    try {
+      state = State.RUNNING;
+      final Sequence<Object[]> baseSequence = 
jdbcStatement.executor().submit(stmt::execute).get();
+
+      // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
+      final Sequence<Object[]> retSequence =
+          maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE
+          ? baseSequence.limit((int) maxRowCount)
+          : baseSequence;
+
+      yielder = Yielders.each(retSequence);
+    }
+    catch (Throwable t) {
+      throw closeAndPropagateThrowable(t);
+    }
+  }
+
+  public synchronized boolean isDone()
+  {
+    return state == State.DONE;
+  }
+
+  public synchronized Meta.Signature getSignature()
+  {
+    ensure(State.RUNNING, State.DONE);
+    return signature;
+  }
+
+  public synchronized Meta.Frame nextFrame(final long fetchOffset, final int 
fetchMaxRowCount)
+  {
+    ensure(State.RUNNING, State.DONE);
+    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != 
offset [%,d]", fetchOffset, offset);
+    if (state == State.DONE) {
+      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+    }
+
+    try {
+      final List<Object> rows = new ArrayList<>();
+      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < 
fetchOffset + fetchMaxRowCount)) {
+        rows.add(yielder.get());
+        yielder = yielder.next(null);
+        offset++;
+      }
+
+      if (yielder.isDone()) {
+        state = State.DONE;
+      }
+
+      return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+    }
+    catch (Throwable t) {
+      throw closeAndPropagateThrowable(t);
+    }
+  }
+
+  public synchronized long getCurrentOffset()
+  {
+    ensure(State.RUNNING, State.DONE);
+    return offset;
+  }
+
+  @GuardedBy("this")
+  private void ensure(final State... desiredStates)
+  {
+    for (State desiredState : desiredStates) {
+      if (state == desiredState) {
+        return;
+      }
+    }
+    throw new ISE("Invalid action for state [%s]", state);
+  }
+
+  private RuntimeException closeAndPropagateThrowable(Throwable t)
+  {
+    DruidMeta.logFailure(t);
+    // Report a failure so that the failure is logged.
+    try {
+      close(t);
+    }
+    catch (Throwable t1) {
+      t.addSuppressed(t1);
+    }
+    finally {
+      state = State.FAILED;
+    }
+
+    // Avoid unnecessary wrapping.
+    if (t instanceof RuntimeException) {
+      return (RuntimeException) t;
+    }
+    return new RuntimeException(t);
+  }
+
+  @Override
+  public synchronized void close()
+  {
+    close(null);
+  }
+
+  private void close(Throwable error)
+  {
+    if (state == State.NEW) {
+      state = State.CLOSED;
+    }
+    if (state == State.CLOSED || state == State.FAILED) {
+      return;
+    }
+    state = State.CLOSED;
+    try {
+      if (yielder != null) {
+        Yielder<Object[]> theYielder = this.yielder;
+        this.yielder = null;
+
+        // Put the close last, so any exceptions it throws are after we did 
the other cleanup above.
+        jdbcStatement.executor().submit(
+            () -> {
+              theYielder.close();
+              // makes this a Callable instead of Runnable so we don't need to 
catch exceptions inside the lambda
+              return null;
+            }
+        ).get();
+
+      }
+    }
+    catch (RuntimeException e) {
+      throw e;
+    }
+    catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+    finally {
+      stmt.finalizeStateAndEmitLogsAndMetrics(error, null, -1);
+    }
+  }
+
+  private enum State
+  {
+    NEW,
+    RUNNING,
+    DONE,
+    FAILED,
+    CLOSED
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
new file mode 100644
index 0000000000..ebe64f5bde
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
+
+/**
+ * Represents Druid's version of the JDBC {@code Statement} class:
+ * can be executed multiple times, one after another, producing a
+ * {@link DruidJdbcResultSet} for each execution.
+ */
+public class DruidJdbcStatement extends AbstractDruidJdbcStatement
+{
+  private final SqlLifecycleFactory lifecycleFactory;
+  protected boolean closed;
+
+  public DruidJdbcStatement(
+      final DruidConnection connection,
+      final int statementId,
+      final SqlLifecycleFactory lifecycleFactory
+  )
+  {
+    super(connection, statementId);
+    this.lifecycleFactory = Preconditions.checkNotNull(lifecycleFactory, 
"lifecycleFactory");
+  }
+
+  public synchronized void execute(SqlQueryPlus sqlRequest, long maxRowCount) 
throws RelConversionException
+  {
+    closeResultSet();
+    SqlLifecycle stmt = lifecycleFactory.factorize();
+    stmt.initialize(sqlRequest.sql(), connection.makeContext());
+    try {
+      stmt.validateAndAuthorize(sqlRequest.authResult());
+      resultSet = new DruidJdbcResultSet(this, sqlRequest, stmt, 
Long.MAX_VALUE);
+      resultSet.execute();
+    }
+    catch (ForbiddenException e) {
+      // Can't finalize statement in in this case. Call will fail with an
+      // assertion error.
+      resultSet = null;
+      DruidMeta.logFailure(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      stmt.finalizeStateAndEmitLogsAndMetrics(t, null, -1);
+      resultSet = null;
+      throw t;
+    }
+  }
+
+  @Override
+  public Meta.Signature getSignature()
+  {
+    return requireResultSet().getSignature();
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
index 781cae53da..d318bd4bf3 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
@@ -40,20 +40,19 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.QueryContext;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authenticator;
 import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.calcite.planner.Calcites;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -108,7 +107,7 @@ public class DruidMeta extends MetaImpl
   private final ErrorHandler errorHandler;
 
   /**
-   * Used to track logical connections.
+   * Tracks logical connections.
    */
   private final ConcurrentMap<String, DruidConnection> connections = new 
ConcurrentHashMap<>();
 
@@ -157,16 +156,13 @@ public class DruidMeta extends MetaImpl
           }
         }
       }
-      // we don't want to stringify arrays for JDBC ever because avatica needs 
to handle this
-      final QueryContext context = new QueryContext(contextMap);
-      context.addSystemParam(PlannerContext.CTX_SQL_STRINGIFY_ARRAYS, false);
-      openDruidConnection(ch.id, secret, context);
+      openDruidConnection(ch.id, secret, contextMap);
     }
     catch (NoSuchConnectionException e) {
       throw e;
     }
     catch (Throwable t) {
-      // we want to avoid sanitizing avatica specific exceptions as the 
avatica code can rely on them to handle issues
+      // we want to avoid sanitizing Avatica specific exceptions as the 
Avatica code can rely on them to handle issues
       // differently
       throw errorHandler.sanitize(t);
     }
@@ -206,11 +202,16 @@ public class DruidMeta extends MetaImpl
     }
   }
 
+  /**
+   * Creates a new implementation of the one-pass JDBC {@code Statement}
+   * class. Corresponds to the JDBC {@code Connection.createStatement()}
+   * method.
+   */
   @Override
   public StatementHandle createStatement(final ConnectionHandle ch)
   {
     try {
-      final DruidStatement druidStatement = 
getDruidConnection(ch.id).createStatement(sqlLifecycleFactory);
+      final DruidJdbcStatement druidStatement = 
getDruidConnection(ch.id).createStatement(sqlLifecycleFactory);
       return new StatementHandle(ch.id, druidStatement.getStatementId(), null);
     }
     catch (NoSuchConnectionException e) {
@@ -221,6 +222,11 @@ public class DruidMeta extends MetaImpl
     }
   }
 
+  /**
+   * Creates a new implementation of the JDBC {@code PreparedStatement}
+   * class which allows preparing once, executing many times. Corresponds to
+   * the JDBC {@code Connection.prepareStatement()} call.
+   */
   @Override
   public StatementHandle prepare(
       final ConnectionHandle ch,
@@ -229,26 +235,19 @@ public class DruidMeta extends MetaImpl
   )
   {
     try {
-      final StatementHandle statement = createStatement(ch);
-      final DruidStatement druidStatement;
-      try {
-        druidStatement = getDruidStatement(statement);
-      }
-      catch (NoSuchStatementException e) {
-        throw logFailure(new ISE(e, e.getMessage()));
-      }
-      final DruidConnection druidConnection = 
getDruidConnection(statement.connectionId);
-      AuthenticationResult authenticationResult = 
authenticateConnection(druidConnection);
-      if (authenticationResult == null) {
-        throw logFailure(
-            new ForbiddenException("Authentication failed."),
-            "Authentication failed for statement[%s]",
-            druidStatement.getStatementId()
-        );
-      }
-      statement.signature = druidStatement.prepare(sql, maxRowCount, 
authenticationResult).getSignature();
-      LOG.debug("Successfully prepared statement[%s] for execution", 
druidStatement.getStatementId());
-      return statement;
+      final DruidConnection druidConnection = getDruidConnection(ch.id);
+      SqlQueryPlus sqlReq = new SqlQueryPlus(
+          sql,
+          null, // Context provided by connection
+          null, // No parameters in this path
+          doAuthenticate(druidConnection)
+      );
+      DruidJdbcPreparedStatement stmt = 
druidConnection.createPreparedStatement(
+          sqlLifecycleFactory,
+          sqlReq,
+          maxRowCount);
+      LOG.debug("Successfully prepared statement [%s] for execution", 
stmt.getStatementId());
+      return new StatementHandle(ch.id, stmt.getStatementId(), 
stmt.getSignature());
     }
     catch (NoSuchConnectionException e) {
       throw e;
@@ -258,6 +257,18 @@ public class DruidMeta extends MetaImpl
     }
   }
 
+  private AuthenticationResult doAuthenticate(final DruidConnection 
druidConnection)
+  {
+    AuthenticationResult authenticationResult = 
authenticateConnection(druidConnection);
+    if (authenticationResult == null) {
+      throw logFailure(
+          new ForbiddenException("Authentication failed."),
+          "Authentication failed for prepare"
+      );
+    }
+    return authenticationResult;
+  }
+
   @Deprecated
   @Override
   public ExecuteResult prepareAndExecute(
@@ -271,6 +282,9 @@ public class DruidMeta extends MetaImpl
     throw errorHandler.sanitize(new UOE("Deprecated"));
   }
 
+  /**
+   * Prepares and executes a JDBC {@code Statement}
+   */
   @Override
   public ExecuteResult prepareAndExecute(
       final StatementHandle statement,
@@ -280,39 +294,19 @@ public class DruidMeta extends MetaImpl
       final PrepareCallback callback
   ) throws NoSuchStatementException
   {
+
     try {
       // Ignore "callback", this class is designed for use with LocalService 
which doesn't use it.
-      final DruidStatement druidStatement = getDruidStatement(statement);
+      final DruidJdbcStatement druidStatement = getDruidStatement(statement, 
DruidJdbcStatement.class);
       final DruidConnection druidConnection = 
getDruidConnection(statement.connectionId);
-      AuthenticationResult authenticationResult = 
authenticateConnection(druidConnection);
-      if (authenticationResult == null) {
-        throw logFailure(
-            new ForbiddenException("Authentication failed."),
-            "Authentication failed for statement[%s]",
-            druidStatement.getStatementId()
-        );
-      }
-      druidStatement.prepare(sql, maxRowCount, authenticationResult);
-      final Frame firstFrame = druidStatement.execute(Collections.emptyList())
-                                             .nextFrame(
-                                                 DruidStatement.START_OFFSET,
-                                                 
getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame)
-                                             );
-      final Signature signature = druidStatement.getSignature();
-      LOG.debug("Successfully prepared statement[%s] and started execution", 
druidStatement.getStatementId());
-      return new ExecuteResult(
-          ImmutableList.of(
-              MetaResultSet.create(
-                  statement.connectionId,
-                  statement.id,
-                  false,
-                  signature,
-                  firstFrame
-              )
-          )
-      );
-    }
-    // cannot affect these exceptions as avatica handles them
+      // No parameters for a "regular" JDBC statement.
+      SqlQueryPlus sqlRequest = new SqlQueryPlus(sql, null, null, 
doAuthenticate(druidConnection));
+      druidConnection.prepareAndExecute(druidStatement, sqlRequest, 
maxRowCount);
+      ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
+      LOG.debug("Successfully prepared statement [%s] and started execution", 
druidStatement.getStatementId());
+      return result;
+    }
+    // Cannot affect these exceptions as Avatica handles them.
     catch (NoSuchConnectionException | NoSuchStatementException e) {
       throw e;
     }
@@ -321,6 +315,27 @@ public class DruidMeta extends MetaImpl
     }
   }
 
+  private ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int 
maxRows)
+  {
+    final Signature signature = druidStatement.getSignature();
+    final Frame firstFrame = druidStatement.nextFrame(
+                                       AbstractDruidJdbcStatement.START_OFFSET,
+                                       getEffectiveMaxRowsPerFrame(maxRows)
+                                   );
+
+    return new ExecuteResult(
+        ImmutableList.of(
+            MetaResultSet.create(
+                druidStatement.getConnectionId(),
+                druidStatement.statementId,
+                false,
+                signature,
+                firstFrame
+            )
+        )
+    );
+  }
+
   @Override
   public ExecuteBatchResult prepareAndExecuteBatch(
       final StatementHandle statement,
@@ -351,7 +366,7 @@ public class DruidMeta extends MetaImpl
     try {
       final int maxRows = getEffectiveMaxRowsPerFrame(fetchMaxRowCount);
       LOG.debug("Fetching next frame from offset[%s] with [%s] rows for 
statement[%s]", offset, maxRows, statement.id);
-      return getDruidStatement(statement).nextFrame(offset, maxRows);
+      return getDruidStatement(statement, 
AbstractDruidJdbcStatement.class).nextFrame(offset, maxRows);
     }
     catch (NoSuchConnectionException e) {
       throw e;
@@ -381,26 +396,14 @@ public class DruidMeta extends MetaImpl
   ) throws NoSuchStatementException
   {
     try {
-      final DruidStatement druidStatement = getDruidStatement(statement);
-      final Frame firstFrame = druidStatement.execute(parameterValues)
-                                             .nextFrame(
-                                                 DruidStatement.START_OFFSET,
-                                                 
getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame)
-                                             );
-
-      final Signature signature = druidStatement.getSignature();
-      LOG.debug("Successfully started execution of statement[%s]", 
druidStatement.getStatementId());
-      return new ExecuteResult(
-          ImmutableList.of(
-              MetaResultSet.create(
-                  statement.connectionId,
-                  statement.id,
-                  false,
-                  signature,
-                  firstFrame
-              )
-          )
-      );
+      final DruidJdbcPreparedStatement druidStatement =
+          getDruidStatement(statement, DruidJdbcPreparedStatement.class);
+      druidStatement.execute(parameterValues);
+      ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
+      LOG.debug(
+          "Successfully started execution of statement[%s]",
+          druidStatement.getStatementId());
+      return result;
     }
     catch (NoSuchStatementException | NoSuchConnectionException e) {
       throw e;
@@ -430,10 +433,7 @@ public class DruidMeta extends MetaImpl
       // connections.get, not getDruidConnection, since we want to silently 
ignore nonexistent statements
       final DruidConnection druidConnection = connections.get(h.connectionId);
       if (druidConnection != null) {
-        final DruidStatement druidStatement = 
druidConnection.getStatement(h.id);
-        if (druidStatement != null) {
-          druidStatement.close();
-        }
+        druidConnection.closeStatement(h.id);
       }
     }
     catch (NoSuchConnectionException e) {
@@ -452,7 +452,7 @@ public class DruidMeta extends MetaImpl
   ) throws NoSuchStatementException
   {
     try {
-      final DruidStatement druidStatement = getDruidStatement(sh);
+      final AbstractDruidJdbcStatement druidStatement = getDruidStatement(sh, 
AbstractDruidJdbcStatement.class);
       final boolean isDone = druidStatement.isDone();
       final long currentOffset = druidStatement.getCurrentOffset();
       if (currentOffset != offset) {
@@ -729,7 +729,7 @@ public class DruidMeta extends MetaImpl
   private DruidConnection openDruidConnection(
       final String connectionId,
       final Map<String, Object> userSecret,
-      final QueryContext context
+      final Map<String, Object> context
   )
   {
     if (connectionCount.incrementAndGet() > config.getMaxConnections()) {
@@ -804,14 +804,22 @@ public class DruidMeta extends MetaImpl
   }
 
   @Nonnull
-  private DruidStatement getDruidStatement(final StatementHandle statement) 
throws NoSuchStatementException
+  private <T extends AbstractDruidJdbcStatement> T getDruidStatement(
+      final StatementHandle statement,
+      final Class<T> stmtClass
+  ) throws NoSuchStatementException
   {
     final DruidConnection connection = 
getDruidConnection(statement.connectionId);
-    final DruidStatement druidStatement = 
connection.getStatement(statement.id);
+    final AbstractDruidJdbcStatement druidStatement = 
connection.getStatement(statement.id);
     if (druidStatement == null) {
       throw logFailure(new NoSuchStatementException(statement));
     }
-    return druidStatement;
+    try {
+      return stmtClass.cast(druidStatement);
+    }
+    catch (ClassCastException e) {
+      throw logFailure(new NoSuchStatementException(statement));
+    }
   }
 
   private MetaResultSet sqlResultSet(final ConnectionHandle ch, final String 
sql)
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
deleted file mode 100644
index b3c7e41284..0000000000
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.avatica;
-
-import com.google.common.base.Preconditions;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.calcite.avatica.AvaticaParameter;
-import org.apache.calcite.avatica.ColumnMetaData;
-import org.apache.calcite.avatica.Meta;
-import org.apache.calcite.avatica.remote.TypedValue;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
-import org.apache.druid.query.QueryContext;
-import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.server.security.ForbiddenException;
-import org.apache.druid.sql.SqlLifecycle;
-import org.apache.druid.sql.calcite.planner.Calcites;
-import org.apache.druid.sql.calcite.planner.PrepareResult;
-
-import java.io.Closeable;
-import java.sql.Array;
-import java.sql.DatabaseMetaData;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Statement handle for {@link DruidMeta}. Thread-safe.
- */
-public class DruidStatement implements Closeable
-{
-  public static final long START_OFFSET = 0;
-  private final String connectionId;
-  private final int statementId;
-  private final QueryContext queryContext;
-  @GuardedBy("lock")
-  private final SqlLifecycle sqlLifecycle;
-  private final Runnable onClose;
-  private final Object lock = new Object();
-  /**
-   * Query metrics can only be used within a single thread. Because results 
can be paginated into multiple
-   * JDBC frames (each frame being processed by a potentially different 
thread), the thread that closes the yielder
-   * (resulting in a QueryMetrics emit() call) may not be the same thread that 
created the yielder (which initializes
-   * DefaultQueryMetrics with the current thread as the owner). Create and 
close the yielder with this
-   * single-thread executor to prevent this from happening.
-   * <p>
-   * The thread owner check in DefaultQueryMetrics is more aggressive than 
needed for this specific JDBC case, since
-   * the JDBC frames are processed sequentially. If the thread owner check is 
changed/loosened to permit this use case,
-   * we would not need to use this executor.
-   * <p>
-   * See discussion at:
-   * https://github.com/apache/druid/pull/4288
-   * https://github.com/apache/druid/pull/4415
-   */
-  private final ExecutorService yielderOpenCloseExecutor;
-  private State state = State.NEW;
-  private String query;
-  private long maxRowCount;
-  private Meta.Signature signature;
-  private Yielder<Object[]> yielder;
-  private int offset = 0;
-  private Throwable throwable;
-  private AuthenticationResult authenticationResult;
-
-  public DruidStatement(
-      final String connectionId,
-      final int statementId,
-      final QueryContext queryContext,
-      final SqlLifecycle sqlLifecycle,
-      final Runnable onClose
-  )
-  {
-    this.connectionId = Preconditions.checkNotNull(connectionId, 
"connectionId");
-    this.statementId = statementId;
-    this.queryContext = queryContext;
-    this.sqlLifecycle = Preconditions.checkNotNull(sqlLifecycle, 
"sqlLifecycle");
-    this.onClose = Preconditions.checkNotNull(onClose, "onClose");
-    this.yielderOpenCloseExecutor = Execs.singleThreaded(
-        StringUtils.format(
-            "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
-            StringUtils.encodeForFormat(connectionId),
-            statementId
-        )
-    );
-  }
-
-  public static List<ColumnMetaData> createColumnMetaData(final RelDataType 
rowType)
-  {
-    final List<ColumnMetaData> columns = new ArrayList<>();
-    List<RelDataTypeField> fieldList = rowType.getFieldList();
-
-    for (int i = 0; i < fieldList.size(); i++) {
-      RelDataTypeField field = fieldList.get(i);
-
-      final ColumnMetaData.AvaticaType columnType;
-      if (field.getType().getSqlTypeName() == SqlTypeName.ARRAY) {
-        final ColumnMetaData.Rep elementRep = 
rep(field.getType().getComponentType().getSqlTypeName());
-        final ColumnMetaData.ScalarType elementType = ColumnMetaData.scalar(
-            
field.getType().getComponentType().getSqlTypeName().getJdbcOrdinal(),
-            field.getType().getComponentType().getSqlTypeName().getName(),
-            elementRep
-        );
-        final ColumnMetaData.Rep arrayRep = 
rep(field.getType().getSqlTypeName());
-        columnType = ColumnMetaData.array(
-            elementType,
-            field.getType().getSqlTypeName().getName(),
-            arrayRep
-        );
-      } else {
-        final ColumnMetaData.Rep rep = rep(field.getType().getSqlTypeName());
-        columnType = ColumnMetaData.scalar(
-            field.getType().getSqlTypeName().getJdbcOrdinal(),
-            field.getType().getSqlTypeName().getName(),
-            rep
-        );
-      }
-      columns.add(
-          new ColumnMetaData(
-              i, // ordinal
-              false, // auto increment
-              true, // case sensitive
-              false, // searchable
-              false, // currency
-              field.getType().isNullable()
-              ? DatabaseMetaData.columnNullable
-              : DatabaseMetaData.columnNoNulls, // nullable
-              true, // signed
-              field.getType().getPrecision(), // display size
-              field.getName(), // label
-              null, // column name
-              null, // schema name
-              field.getType().getPrecision(), // precision
-              field.getType().getScale(), // scale
-              null, // table name
-              null, // catalog name
-              columnType, // avatica type
-              true, // read only
-              false, // writable
-              false, // definitely writable
-              columnType.columnClassName() // column class name
-          )
-      );
-    }
-
-    return columns;
-  }
-
-  public DruidStatement prepare(
-      final String query,
-      final long maxRowCount,
-      final AuthenticationResult authenticationResult
-  )
-  {
-    synchronized (lock) {
-      try {
-        ensure(State.NEW);
-        sqlLifecycle.initialize(query, queryContext);
-        sqlLifecycle.validateAndAuthorize(authenticationResult);
-        this.authenticationResult = authenticationResult;
-        PrepareResult prepareResult = sqlLifecycle.prepare();
-        this.maxRowCount = maxRowCount;
-        this.query = query;
-        List<AvaticaParameter> params = new ArrayList<>();
-        final RelDataType parameterRowType = 
prepareResult.getParameterRowType();
-        for (RelDataTypeField field : parameterRowType.getFieldList()) {
-          RelDataType type = field.getType();
-          params.add(createParameter(field, type));
-        }
-        this.signature = Meta.Signature.create(
-            createColumnMetaData(prepareResult.getRowType()),
-            query,
-            params,
-            Meta.CursorFactory.ARRAY,
-            Meta.StatementType.SELECT // We only support SELECT
-        );
-        this.state = State.PREPARED;
-      }
-      catch (Throwable t) {
-        return closeAndPropagateThrowable(t);
-      }
-
-      return this;
-    }
-  }
-
-
-  public DruidStatement execute(List<TypedValue> parameters)
-  {
-    synchronized (lock) {
-      ensure(State.PREPARED);
-      try {
-        sqlLifecycle.setParameters(parameters);
-        sqlLifecycle.validateAndAuthorize(authenticationResult);
-        sqlLifecycle.plan();
-        final Sequence<Object[]> baseSequence = 
yielderOpenCloseExecutor.submit(sqlLifecycle::execute).get();
-
-        // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
-        final Sequence<Object[]> retSequence =
-            maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE
-            ? baseSequence.limit((int) maxRowCount)
-            : baseSequence;
-
-        yielder = Yielders.each(retSequence);
-        state = State.RUNNING;
-      }
-      catch (Throwable t) {
-        closeAndPropagateThrowable(t);
-      }
-
-      return this;
-    }
-  }
-
-  public String getConnectionId()
-  {
-    return connectionId;
-  }
-
-  public int getStatementId()
-  {
-    return statementId;
-  }
-
-  public String getQuery()
-  {
-    synchronized (lock) {
-      ensure(State.PREPARED, State.RUNNING, State.DONE);
-      return query;
-    }
-  }
-
-  public Meta.Signature getSignature()
-  {
-    synchronized (lock) {
-      ensure(State.PREPARED, State.RUNNING, State.DONE);
-      return signature;
-    }
-  }
-
-  public long getCurrentOffset()
-  {
-    synchronized (lock) {
-      ensure(State.RUNNING, State.DONE);
-      return offset;
-    }
-  }
-
-  public boolean isDone()
-  {
-    synchronized (lock) {
-      return state == State.DONE;
-    }
-  }
-
-  public Meta.Frame nextFrame(final long fetchOffset, final int 
fetchMaxRowCount)
-  {
-    synchronized (lock) {
-      ensure(State.RUNNING);
-      Preconditions.checkState(fetchOffset == offset, "fetchOffset[%,d] != 
offset[%,d]", fetchOffset, offset);
-
-      try {
-        final List<Object> rows = new ArrayList<>();
-        while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < 
fetchOffset + fetchMaxRowCount)) {
-          rows.add(yielder.get());
-          yielder = yielder.next(null);
-          offset++;
-        }
-
-        final boolean done = yielder.isDone();
-        if (done) {
-          close();
-        }
-
-        return new Meta.Frame(fetchOffset, done, rows);
-      }
-      catch (Throwable t) {
-        this.throwable = t;
-        try {
-          close();
-        }
-        catch (Throwable t1) {
-          t.addSuppressed(t1);
-        }
-        throw t;
-      }
-    }
-  }
-
-  @Override
-  public void close()
-  {
-    State oldState = null;
-    try {
-      synchronized (lock) {
-        oldState = state;
-        state = State.DONE;
-        if (yielder != null) {
-          Yielder<Object[]> theYielder = this.yielder;
-          this.yielder = null;
-
-          // Put the close last, so any exceptions it throws are after we did 
the other cleanup above.
-          yielderOpenCloseExecutor.submit(
-              () -> {
-                theYielder.close();
-                // makes this a Callable instead of Runnable so we don't need 
to catch exceptions inside the lambda
-                return null;
-              }
-          ).get();
-
-          yielderOpenCloseExecutor.shutdownNow();
-        }
-      }
-    }
-    catch (Throwable t) {
-      if (oldState != State.DONE) {
-        // First close. Run the onClose function.
-        try {
-          onClose.run();
-          synchronized (lock) {
-            sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(t, null, -1);
-          }
-        }
-        catch (Throwable t1) {
-          t.addSuppressed(t1);
-        }
-      }
-
-      throw new RuntimeException(t);
-    }
-
-    if (oldState != State.DONE) {
-      // First close. Run the onClose function.
-      try {
-        if (!(this.throwable instanceof ForbiddenException)) {
-          synchronized (lock) {
-            sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(this.throwable, 
null, -1);
-          }
-        } else {
-          DruidMeta.logFailure(this.throwable);
-        }
-        onClose.run();
-      }
-      catch (Throwable t) {
-        throw new RuntimeException(t);
-      }
-    }
-  }
-
-  private AvaticaParameter createParameter(RelDataTypeField field, RelDataType 
type)
-  {
-    // signed is always false because no way to extract from RelDataType, and 
the only usage of this AvaticaParameter
-    // constructor I can find, in CalcitePrepareImpl, does it this way with 
hard coded false
-    return new AvaticaParameter(
-        false,
-        type.getPrecision(),
-        type.getScale(),
-        type.getSqlTypeName().getJdbcOrdinal(),
-        type.getSqlTypeName().getName(),
-        Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(),
-        field.getName()
-    );
-  }
-
-
-  private DruidStatement closeAndPropagateThrowable(Throwable t)
-  {
-    this.throwable = t;
-    DruidMeta.logFailure(t);
-    try {
-      close();
-    }
-    catch (Throwable t1) {
-      t.addSuppressed(t1);
-    }
-    throw new RuntimeException(t);
-  }
-
-  @GuardedBy("lock")
-  private void ensure(final State... desiredStates)
-  {
-    for (State desiredState : desiredStates) {
-      if (state == desiredState) {
-        return;
-      }
-    }
-    throw new ISE("Invalid action for state[%s]", state);
-  }
-
-  private static ColumnMetaData.Rep rep(final SqlTypeName sqlType)
-  {
-    if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
-      return ColumnMetaData.Rep.of(String.class);
-    } else if (sqlType == SqlTypeName.TIMESTAMP) {
-      return ColumnMetaData.Rep.of(Long.class);
-    } else if (sqlType == SqlTypeName.DATE) {
-      return ColumnMetaData.Rep.of(Integer.class);
-    } else if (sqlType == SqlTypeName.INTEGER) {
-      // use Number.class for exact numeric types since JSON transport might 
switch longs to integers
-      return ColumnMetaData.Rep.of(Number.class);
-    } else if (sqlType == SqlTypeName.BIGINT) {
-      // use Number.class for exact numeric types since JSON transport might 
switch longs to integers
-      return ColumnMetaData.Rep.of(Number.class);
-    } else if (sqlType == SqlTypeName.FLOAT) {
-      return ColumnMetaData.Rep.of(Float.class);
-    } else if (sqlType == SqlTypeName.DOUBLE || sqlType == 
SqlTypeName.DECIMAL) {
-      return ColumnMetaData.Rep.of(Double.class);
-    } else if (sqlType == SqlTypeName.BOOLEAN) {
-      return ColumnMetaData.Rep.of(Boolean.class);
-    } else if (sqlType == SqlTypeName.OTHER) {
-      return ColumnMetaData.Rep.of(Object.class);
-    } else if (sqlType == SqlTypeName.ARRAY) {
-      return ColumnMetaData.Rep.of(Array.class);
-    } else {
-      throw new ISE("No rep for SQL type[%s]", sqlType);
-    }
-  }
-
-  enum State
-  {
-    NEW,
-    PREPARED,
-    RUNNING,
-    DONE
-  }
-}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 0e7128b069..81167d919f 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -54,6 +54,7 @@ import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.DefaultQueryConfig;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.QuerySchedulerProvider;
@@ -109,22 +110,29 @@ import java.sql.Statement;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
-public abstract class DruidAvaticaHandlerTest extends CalciteTestBase
+/**
+ * Tests the Avatica-based JDBC implementation using JSON serialization. See
+ * {@link DruidAvaticaProtobufHandlerTest} for a subclass which runs
+ * this same set of tests using Protobuf serialization.
+ */
+public class DruidAvaticaHandlerTest extends CalciteTestBase
 {
   private static final AvaticaServerConfig AVATICA_CONFIG = new 
AvaticaServerConfig()
   {
     @Override
     public int getMaxConnections()
     {
-      // This must match the number of Connection objects created in setUp()
+      // This must match the number of Connection objects created in 
testTooManyStatements()
       return 4;
     }
 
@@ -146,6 +154,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   {
     resourceCloser = Closer.create();
     conglomerate = 
QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
+    System.setProperty("user.timezone", "UTC");
   }
 
   @AfterClass
@@ -260,139 +269,155 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testSelectCount() throws Exception
+  public void testSelectCount() throws SQLException
   {
-    final ResultSet resultSet = client.createStatement().executeQuery("SELECT 
COUNT(*) AS cnt FROM druid.foo");
-    final List<Map<String, Object>> rows = getRows(resultSet);
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("cnt", 6L)
-        ),
-        rows
-    );
+    try (Statement stmt = client.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery("SELECT COUNT(*) AS cnt 
FROM druid.foo");
+      final List<Map<String, Object>> rows = getRows(resultSet);
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 6L)
+          ),
+          rows
+      );
+    }
   }
 
   @Test
-  public void testSelectCountNoTrailingSlash() throws Exception
+  public void testSelectCountNoTrailingSlash() throws SQLException
   {
-    final ResultSet resultSet = 
clientNoTrailingSlash.createStatement().executeQuery("SELECT COUNT(*) AS cnt 
FROM druid.foo");
-    final List<Map<String, Object>> rows = getRows(resultSet);
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("cnt", 6L)
-        ),
-        rows
-    );
+    try (Statement stmt = clientNoTrailingSlash.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery("SELECT COUNT(*) AS cnt 
FROM druid.foo");
+      final List<Map<String, Object>> rows = getRows(resultSet);
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 6L)
+          ),
+          rows
+      );
+    }
   }
 
   @Test
-  public void testSelectCountAlternateStyle() throws Exception
+  public void testSelectCountAlternateStyle() throws SQLException
   {
-    final ResultSet resultSet = client.prepareStatement("SELECT COUNT(*) AS 
cnt FROM druid.foo").executeQuery();
-    final List<Map<String, Object>> rows = getRows(resultSet);
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("cnt", 6L)
-        ),
-        rows
-    );
+    try (PreparedStatement stmt = client.prepareStatement("SELECT COUNT(*) AS 
cnt FROM druid.foo")) {
+      final ResultSet resultSet = stmt.executeQuery();
+      final List<Map<String, Object>> rows = getRows(resultSet);
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 6L)
+          ),
+          rows
+      );
+    }
   }
 
   @Test
-  public void testTimestampsInResponse() throws Exception
+  public void testTimestampsInResponse() throws SQLException
   {
-    final ResultSet resultSet = client.createStatement().executeQuery(
-        "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
-    );
+    try (Statement stmt = client.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery(
+          "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
+      );
 
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of(
-                "__time", new 
Timestamp(DateTimes.of("2000-01-01T00:00:00.000Z").getMillis()),
-                "t2", new Date(DateTimes.of("2000-01-01").getMillis())
-            )
-        ),
-        getRows(resultSet)
-    );
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of(
+                  "__time", new 
Timestamp(DateTimes.of("2000-01-01T00:00:00.000Z").getMillis()),
+                  "t2", new Date(DateTimes.of("2000-01-01").getMillis())
+              )
+          ),
+          getRows(resultSet)
+      );
+    }
   }
 
   @Test
-  public void testTimestampsInResponseLosAngelesTimeZone() throws Exception
+  public void testTimestampsInResponseLosAngelesTimeZone() throws SQLException
   {
-    final ResultSet resultSet = 
clientLosAngeles.createStatement().executeQuery(
-        "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
-    );
+    try (Statement stmt = clientLosAngeles.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery(
+          "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
+      );
 
-    final DateTimeZone timeZone = 
DateTimes.inferTzFromString("America/Los_Angeles");
-    final DateTime localDateTime = new DateTime("2000-01-01T00Z", timeZone);
+      final DateTimeZone timeZone = 
DateTimes.inferTzFromString("America/Los_Angeles");
+      final DateTime localDateTime = new DateTime("2000-01-01T00Z", timeZone);
 
-    final List<Map<String, Object>> resultRows = getRows(resultSet);
+      final List<Map<String, Object>> resultRows = getRows(resultSet);
 
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of(
-                "__time", new 
Timestamp(Calcites.jodaToCalciteTimestamp(localDateTime, timeZone)),
-                "t2", new 
Date(Calcites.jodaToCalciteTimestamp(localDateTime.dayOfMonth().roundFloorCopy(),
 timeZone))
-            )
-        ),
-        resultRows
-    );
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of(
+                  "__time", new 
Timestamp(Calcites.jodaToCalciteTimestamp(localDateTime, timeZone)),
+                  "t2", new 
Date(Calcites.jodaToCalciteTimestamp(localDateTime.dayOfMonth().roundFloorCopy(),
 timeZone))
+              )
+          ),
+          resultRows
+      );
+    }
   }
 
   @Test
-  public void testFieldAliasingSelect() throws Exception
+  public void testFieldAliasingSelect() throws SQLException
   {
-    final ResultSet resultSet = client.createStatement().executeQuery(
-        "SELECT dim2 AS \"x\", dim2 AS \"y\" FROM druid.foo LIMIT 1"
-    );
+    try (Statement stmt = client.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery(
+          "SELECT dim2 AS \"x\", dim2 AS \"y\" FROM druid.foo LIMIT 1"
+      );
 
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("x", "a", "y", "a")
-        ),
-        getRows(resultSet)
-    );
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("x", "a", "y", "a")
+          ),
+          getRows(resultSet)
+      );
+    }
   }
 
   @Test
-  public void testSelectBoolean() throws Exception
+  public void testSelectBoolean() throws SQLException
   {
-    final ResultSet resultSet = client.createStatement().executeQuery(
-        "SELECT dim2, dim2 IS NULL AS isnull FROM druid.foo LIMIT 1"
-    );
+    try (Statement stmt = client.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery(
+          "SELECT dim2, dim2 IS NULL AS isnull FROM druid.foo LIMIT 1"
+      );
 
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("dim2", "a", "isnull", false)
-        ),
-        getRows(resultSet)
-    );
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("dim2", "a", "isnull", false)
+          ),
+          getRows(resultSet)
+      );
+    }
   }
 
   @Test
-  public void testExplainSelectCount() throws Exception
+  public void testExplainSelectCount() throws SQLException
   {
-    final ResultSet resultSet = 
clientLosAngeles.createStatement().executeQuery(
-        "EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo"
-    );
+    try (Statement stmt = clientLosAngeles.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery(
+          "EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo"
+      );
 
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of(
-                "PLAN",
-                
StringUtils.format("DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}}],
 signature=[{a0:LONG}])\n",
-                                   DUMMY_SQL_QUERY_ID
-                ),
-                "RESOURCES",
-                "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"
-            )
-        ),
-        getRows(resultSet)
-    );
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of(
+                  "PLAN",
+                  
StringUtils.format("DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}}],
 signature=[{a0:LONG}])\n",
+                                     DUMMY_SQL_QUERY_ID
+                  ),
+                  "RESOURCES",
+                  "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"
+              )
+          ),
+          getRows(resultSet)
+      );
+    }
   }
 
   @Test
-  public void testDatabaseMetaDataCatalogs() throws Exception
+  public void testDatabaseMetaDataCatalogs() throws SQLException
   {
     final DatabaseMetaData metaData = client.getMetaData();
     Assert.assertEquals(
@@ -404,7 +429,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testDatabaseMetaDataSchemas() throws Exception
+  public void testDatabaseMetaDataSchemas() throws SQLException
   {
     final DatabaseMetaData metaData = client.getMetaData();
     Assert.assertEquals(
@@ -416,7 +441,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testDatabaseMetaDataTables() throws Exception
+  public void testDatabaseMetaDataTables() throws SQLException
   {
     final DatabaseMetaData metaData = client.getMetaData();
     Assert.assertEquals(
@@ -485,7 +510,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testDatabaseMetaDataTablesAsSuperuser() throws Exception
+  public void testDatabaseMetaDataTablesAsSuperuser() throws SQLException
   {
     final DatabaseMetaData metaData = superuserClient.getMetaData();
     Assert.assertEquals(
@@ -559,7 +584,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testDatabaseMetaDataColumns() throws Exception
+  public void testDatabaseMetaDataColumns() throws SQLException
   {
     final DatabaseMetaData metaData = client.getMetaData();
     Assert.assertEquals(
@@ -637,7 +662,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testDatabaseMetaDataColumnsOnForbiddenDatasource() throws 
Exception
+  public void testDatabaseMetaDataColumnsOnForbiddenDatasource() throws 
SQLException
   {
     final DatabaseMetaData metaData = client.getMetaData();
     Assert.assertEquals(
@@ -650,7 +675,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testDatabaseMetaDataColumnsWithSuperuser() throws Exception
+  public void testDatabaseMetaDataColumnsWithSuperuser() throws SQLException
   {
     final DatabaseMetaData metaData = superuserClient.getMetaData();
     Assert.assertEquals(
@@ -719,9 +744,8 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     );
   }
 
-
   @Test(timeout = 90_000L)
-  public void testConcurrentQueries() throws Exception
+  public void testConcurrentQueries() throws InterruptedException, 
ExecutionException
   {
     final List<ListenableFuture<Integer>> futures = new ArrayList<>();
     final ListeningExecutorService exec = MoreExecutors.listeningDecorator(
@@ -749,23 +773,24 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     for (int i = 0; i < 2000; i++) {
       Assert.assertEquals(i + 6, (int) integers.get(i));
     }
+    exec.shutdown();
   }
 
   @Test
-  public void testTooManyStatements() throws Exception
+  public void testTooManyStatements() throws SQLException
   {
-    final Statement statement1 = client.createStatement();
-    final Statement statement2 = client.createStatement();
-    final Statement statement3 = client.createStatement();
-    final Statement statement4 = client.createStatement();
+    client.createStatement();
+    client.createStatement();
+    client.createStatement();
+    client.createStatement();
 
     expectedException.expect(AvaticaClientRuntimeException.class);
-    expectedException.expectMessage("Too many open statements, limit is[4]");
-    final Statement statement5 = client.createStatement();
+    expectedException.expectMessage("Too many open statements, limit is [4]");
+    client.createStatement();
   }
 
   @Test
-  public void testNotTooManyStatementsWhenYouCloseThem() throws Exception
+  public void testNotTooManyStatementsWhenYouCloseThem() throws SQLException
   {
     client.createStatement().close();
     client.createStatement().close();
@@ -777,54 +802,75 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     client.createStatement().close();
     client.createStatement().close();
     client.createStatement().close();
-
-    Assert.assertTrue(true);
   }
 
+  /**
+   * JDBC allows sequential reuse of statements. A statement is not closed 
until
+   * the application closes it (or the connection), but the statement's result 
set
+   * is closed on each EOF.
+   */
   @Test
-  public void testNotTooManyStatementsWhenYouFullyIterateThem() throws 
Exception
+  public void testManyUsesOfTheSameStatement() throws SQLException
   {
-    for (int i = 0; i < 50; i++) {
-      final ResultSet resultSet = client.createStatement().executeQuery(
-          "SELECT COUNT(*) AS cnt FROM druid.foo"
-      );
-      Assert.assertEquals(
-          ImmutableList.of(
-              ImmutableMap.of("cnt", 6L)
-          ),
-          getRows(resultSet)
-      );
+    try (Statement statement = client.createStatement()) {
+      for (int i = 0; i < 50; i++) {
+        final ResultSet resultSet = statement.executeQuery(
+            "SELECT COUNT(*) AS cnt FROM druid.foo"
+        );
+        Assert.assertEquals(
+            ImmutableList.of(
+                ImmutableMap.of("cnt", 6L)
+            ),
+            getRows(resultSet)
+        );
+      }
     }
-
-    Assert.assertTrue(true);
   }
 
+  /**
+   * Statements should not be closed if then encounter an error. The {@code 
ResultSet}
+   * can be closed, but not the statement.
+   */
   @Test
-  public void testNotTooManyStatementsWhenTheyThrowErrors() throws Exception
+  public void tesErrorsDoNotCloseStatements() throws SQLException
   {
-    for (int i = 0; i < 50; i++) {
-      Exception thrown = null;
+    try (Statement statement = client.createStatement()) {
       try {
-        client.createStatement().executeQuery("SELECT SUM(nonexistent) FROM 
druid.foo");
+        statement.executeQuery("SELECT SUM(nonexistent) FROM druid.foo");
+        Assert.fail();
       }
       catch (Exception e) {
-        thrown = e;
+        // Expected
       }
 
-      Assert.assertNotNull(thrown);
-
-      final ResultSet resultSet = 
client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
+      final ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS 
cnt FROM druid.foo");
       Assert.assertEquals(
           ImmutableList.of(ImmutableMap.of("cnt", 6L)),
           getRows(resultSet)
       );
     }
+  }
 
-    Assert.assertTrue(true);
+  /**
+   * Since errors do not close statements, they must be closed by the 
application,
+   * preferably in a try-with-resources block.
+   */
+  @Test
+  public void testNotTooManyStatementsWhenClosed()
+  {
+    for (int i = 0; i < 50; i++) {
+      try (Statement statement = client.createStatement()) {
+        statement.executeQuery("SELECT SUM(nonexistent) FROM druid.foo");
+        Assert.fail();
+      }
+      catch (Exception e) {
+        // Expected
+      }
+    }
   }
 
   @Test
-  public void testAutoReconnectOnNoSuchConnection() throws Exception
+  public void testAutoReconnectOnNoSuchConnection() throws SQLException
   {
     for (int i = 0; i < 50; i++) {
       final ResultSet resultSet = 
client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
@@ -834,12 +880,10 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
       );
       druidMeta.closeAllConnections();
     }
-
-    Assert.assertTrue(true);
   }
 
   @Test
-  public void testTooManyConnections() throws Exception
+  public void testTooManyConnections() throws SQLException
   {
     client.createStatement();
     clientLosAngeles.createStatement();
@@ -849,23 +893,16 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     expectedException.expect(AvaticaClientRuntimeException.class);
     expectedException.expectMessage("Too many connections");
 
-    final Connection connection5 = DriverManager.getConnection(url);
+    DriverManager.getConnection(url);
   }
 
   @Test
-  public void testNotTooManyConnectionsWhenTheyAreEmpty() throws Exception
+  public void testNotTooManyConnectionsWhenTheyAreEmpty() throws SQLException
   {
-    final Connection connection1 = DriverManager.getConnection(url);
-    connection1.createStatement().close();
-
-    final Connection connection2 = DriverManager.getConnection(url);
-    connection2.createStatement().close();
-
-    final Connection connection3 = DriverManager.getConnection(url);
-    connection3.createStatement().close();
-
-    final Connection connection4 = DriverManager.getConnection(url);
-    Assert.assertTrue(true);
+    for (int i = 0; i < 4; i++) {
+      try (Connection connection = DriverManager.getConnection(url)) {
+      }
+    }
   }
 
   @Test
@@ -957,7 +994,6 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     );
   }
 
-
   @Test
   public void testMinRowsPerFrame() throws Exception
   {
@@ -1031,7 +1067,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     String smallFrameUrl = this.getJdbcConnectionString(port);
     Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, 
"regularUser", "druid");
 
-    // use a prepared statement because avatica currently ignores fetchSize on 
the initial fetch of a Statement
+    // use a prepared statement because Avatica currently ignores fetchSize on 
the initial fetch of a Statement
     PreparedStatement statement = smallFrameClient.prepareStatement("SELECT 
dim1 FROM druid.foo");
     // set a fetch size below the minimum configured threshold
     statement.setFetchSize(2);
@@ -1053,12 +1089,14 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void testSqlRequestLog() throws Exception
+  public void testSqlRequestLog() throws SQLException
   {
     // valid sql
+    testRequestLogger.clear();
     for (int i = 0; i < 3; i++) {
-      client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM 
druid.foo");
+      try (Statement stmt = client.createStatement()) {
+        stmt.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
+      }
     }
     Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size());
     for (RequestLogLine logLine : testRequestLogger.getSqlQueryLogs()) {
@@ -1071,107 +1109,188 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
 
     // invalid sql
     testRequestLogger.clear();
-    try {
-      client.createStatement().executeQuery("SELECT notexist FROM druid.foo");
+    try (Statement stmt = client.createStatement()) {
+      stmt.executeQuery("SELECT notexist FROM druid.foo");
       Assert.fail("invalid SQL should throw SQLException");
     }
     catch (SQLException e) {
+      // Expected
     }
     Assert.assertEquals(1, testRequestLogger.getSqlQueryLogs().size());
-    final Map<String, Object> stats = 
testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats();
-    Assert.assertEquals(false, stats.get("success"));
-    Assert.assertEquals("regularUser", stats.get("identity"));
-    Assert.assertTrue(stats.containsKey("exception"));
+    {
+      final Map<String, Object> stats = 
testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats();
+      Assert.assertEquals(false, stats.get("success"));
+      Assert.assertEquals("regularUser", stats.get("identity"));
+      Assert.assertTrue(stats.containsKey("exception"));
+    }
 
     // unauthorized sql
     testRequestLogger.clear();
-    try {
-      client.createStatement().executeQuery("SELECT count(*) FROM 
druid.forbiddenDatasource");
+    try (Statement stmt = client.createStatement()) {
+      stmt.executeQuery("SELECT count(*) FROM druid.forbiddenDatasource");
       Assert.fail("unauthorzed SQL should throw SQLException");
     }
     catch (SQLException e) {
+      // Expected
     }
+    // SqlLifecycle does not allow logging for security failures.
     Assert.assertEquals(0, testRequestLogger.getSqlQueryLogs().size());
   }
 
   @Test
-  public void testParameterBinding() throws Exception
+  public void testSqlRequestLogPrepared() throws SQLException
   {
-    PreparedStatement statement = client.prepareStatement("SELECT COUNT(*) AS 
cnt FROM druid.foo WHERE dim1 = ? OR dim1 = ?");
-    statement.setString(1, "abc");
-    statement.setString(2, "def");
-    final ResultSet resultSet = statement.executeQuery();
-    final List<Map<String, Object>> rows = getRows(resultSet);
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("cnt", 2L)
-        ),
-        rows
-    );
+    // valid sql
+    testRequestLogger.clear();
+    for (int i = 0; i < 3; i++) {
+      try (PreparedStatement stmt = client.prepareStatement("SELECT COUNT(*) 
AS cnt FROM druid.foo")) {
+        stmt.execute();
+      }
+    }
+    Assert.assertEquals(6, testRequestLogger.getSqlQueryLogs().size());
+    for (RequestLogLine logLine : testRequestLogger.getSqlQueryLogs()) {
+      final Map<String, Object> stats = logLine.getQueryStats().getStats();
+      Assert.assertEquals(true, stats.get("success"));
+      Assert.assertEquals("regularUser", stats.get("identity"));
+      Assert.assertTrue(stats.containsKey("sqlQuery/time"));
+      Assert.assertTrue(stats.containsKey("sqlQuery/bytes"));
+    }
+
+    // invalid sql
+    testRequestLogger.clear();
+    try (PreparedStatement stmt = client.prepareStatement("SELECT notexist 
FROM druid.foo")) {
+      Assert.fail("invalid SQL should throw SQLException");
+    }
+    catch (SQLException e) {
+      // Expected
+    }
+    Assert.assertEquals(1, testRequestLogger.getSqlQueryLogs().size());
+    {
+      final Map<String, Object> stats = 
testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats();
+      Assert.assertEquals(false, stats.get("success"));
+      Assert.assertEquals("regularUser", stats.get("identity"));
+      Assert.assertTrue(stats.containsKey("exception"));
+    }
+
+    // unauthorized sql
+    testRequestLogger.clear();
+    try (PreparedStatement stmt = client.prepareStatement("SELECT count(*) 
FROM druid.forbiddenDatasource")) {
+      Assert.fail("unauthorzed SQL should throw SQLException");
+    }
+    catch (SQLException e) {
+      // Expected
+    }
+    // SqlLifecycle does not allow logging for security failures.
+    Assert.assertEquals(0, testRequestLogger.getSqlQueryLogs().size());
   }
 
   @Test
-  public void testSysTableParameterBindingRegularUser() throws Exception
+  public void testParameterBinding() throws SQLException
   {
-    PreparedStatement statement =
-        client.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE 
servers.host = ?");
-    statement.setString(1, "dummy");
-
-    Assert.assertThrows(
-        "Insufficient permission to view servers",
-        AvaticaSqlException.class,
-        statement::executeQuery
-    );
+    try (PreparedStatement statement = client.prepareStatement("SELECT 
COUNT(*) AS cnt FROM druid.foo WHERE dim1 = ? OR dim1 = ?")) {
+      statement.setString(1, "abc");
+      statement.setString(2, "def");
+      final ResultSet resultSet = statement.executeQuery();
+      final List<Map<String, Object>> rows = getRows(resultSet);
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 2L)
+          ),
+          rows
+      );
+    }
   }
 
   @Test
-  public void testSysTableParameterBindingSuperUser() throws Exception
+  public void testSysTableParameterBindingRegularUser() throws SQLException
   {
-    PreparedStatement statement =
-        superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM 
sys.servers WHERE servers.host = ?");
-    statement.setString(1, "dummy");
-    final ResultSet resultSet = statement.executeQuery();
-    final List<Map<String, Object>> rows = getRows(resultSet);
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("cnt", 1L)
-        ),
-        rows
-    );
+    try (PreparedStatement statement =
+        client.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE 
servers.host = ?")) {
+      statement.setString(1, "dummy");
+
+      Assert.assertThrows(
+          "Insufficient permission to view servers",
+          AvaticaSqlException.class,
+          statement::executeQuery
+      );
+    }
   }
 
   @Test
-  public void testExtendedCharacters() throws Exception
+  public void testSysTableParameterBindingSuperUser() throws SQLException
   {
-    final ResultSet resultSet = client.createStatement().executeQuery(
-        "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE 
dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'"
-    );
-    final List<Map<String, Object>> rows = getRows(resultSet);
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("cnt", 1L)
-        ),
-        rows
-    );
+    try (PreparedStatement statement =
+        superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM 
sys.servers WHERE servers.host = ?")) {
+      statement.setString(1, "dummy");
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 1L)
+          ),
+          getRows(statement.executeQuery())
+      );
+    }
+  }
 
+  @Test
+  public void testExecuteMany() throws SQLException
+  {
+    try (PreparedStatement statement =
+        superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM 
sys.servers WHERE servers.host = ?")) {
+      statement.setString(1, "dummy");
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 1L)
+          ),
+          getRows(statement.executeQuery())
+      );
+      statement.setString(1, "foo");
+      Assert.assertEquals(
+          Collections.emptyList(),
+          getRows(statement.executeQuery())
+      );
+      statement.setString(1, "dummy");
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 1L)
+          ),
+          getRows(statement.executeQuery())
+      );
+    }
+  }
 
-    PreparedStatement statement = client.prepareStatement(
-        "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE 
dimMultivalEnumerated = ?"
-    );
-    statement.setString(1, "ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ");
-    final ResultSet resultSet2 = statement.executeQuery();
-    final List<Map<String, Object>> rows2 = getRows(resultSet2);
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableMap.of("cnt", 1L)
-        ),
-        rows
-    );
-    Assert.assertEquals(rows, rows2);
+  @Test
+  public void testExtendedCharacters() throws SQLException
+  {
+    try (Statement stmt = client.createStatement()) {
+      final ResultSet resultSet = stmt.executeQuery(
+          "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE 
dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'"
+      );
+      final List<Map<String, Object>> rows = getRows(resultSet);
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 1L)
+          ),
+          rows
+      );
+    }
+
+    try (PreparedStatement statement = client.prepareStatement(
+        "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE 
dimMultivalEnumerated = ?")) {
+      statement.setString(1, "ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ");
+      final ResultSet resultSet2 = statement.executeQuery();
+      final List<Map<String, Object>> rows = getRows(resultSet2);
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("cnt", 1L)
+          ),
+          rows
+      );
+      Assert.assertEquals(rows, rows);
+    }
   }
 
   @Test
-  public void testEscapingForGetColumns() throws Exception
+  public void testEscapingForGetColumns() throws SQLException
   {
     final DatabaseMetaData metaData = client.getMetaData();
 
@@ -1325,7 +1444,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
   }
 
   @Test
-  public void testEscapingForGetTables() throws Exception
+  public void testEscapingForGetTables() throws SQLException
   {
     final DatabaseMetaData metaData = client.getMetaData();
 
@@ -1374,36 +1493,51 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     );
   }
 
-
   @Test
-  public void testArrayStuffs() throws Exception
+  public void testArrayStuff() throws SQLException
   {
-    PreparedStatement statement = client.prepareStatement(
-        "SELECT ARRAY_AGG(dim2) AS arr1, ARRAY_AGG(l1) AS arr2, ARRAY_AGG(d1)  
AS arr3, ARRAY_AGG(f1) AS arr4 FROM druid.numfoo"
-    );
-    final ResultSet resultSet = statement.executeQuery();
-    final List<Map<String, Object>> rows = getRows(resultSet);
-    Assert.assertEquals(1, rows.size());
-    Assert.assertTrue(rows.get(0).containsKey("arr1"));
-    Assert.assertTrue(rows.get(0).containsKey("arr2"));
-    Assert.assertTrue(rows.get(0).containsKey("arr3"));
-    Assert.assertTrue(rows.get(0).containsKey("arr4"));
-    if (NullHandling.sqlCompatible()) {
-      Assert.assertArrayEquals(new Object[]{"a", null, "", "a", "abc", null}, 
(Object[]) rows.get(0).get("arr1"));
-      Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, null, null, 
null}, (Object[]) rows.get(0).get("arr2"));
-      Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, null, null, null}, 
(Object[]) rows.get(0).get("arr3"));
-      Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, null, null, 
null}, (Object[]) rows.get(0).get("arr4"));
-    } else {
-      Assert.assertArrayEquals(new Object[]{"a", null, null, "a", "abc", 
null}, (Object[]) rows.get(0).get("arr1"));
-      Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, 0L, 0L, 0L}, 
(Object[]) rows.get(0).get("arr2"));
-      Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, 0.0, 0.0, 0.0}, 
(Object[]) rows.get(0).get("arr3"));
-      Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, 0.0f, 0.0f, 
0.0f}, (Object[]) rows.get(0).get("arr4"));
+    try (PreparedStatement statement = client.prepareStatement(
+        "SELECT ARRAY_AGG(dim2) AS arr1, ARRAY_AGG(l1) AS arr2, ARRAY_AGG(d1)  
AS arr3, ARRAY_AGG(f1) AS arr4 FROM druid.numfoo")) {
+      final ResultSet resultSet = statement.executeQuery();
+      final List<Map<String, Object>> rows = getRows(resultSet);
+      Assert.assertEquals(1, rows.size());
+      Assert.assertTrue(rows.get(0).containsKey("arr1"));
+      Assert.assertTrue(rows.get(0).containsKey("arr2"));
+      Assert.assertTrue(rows.get(0).containsKey("arr3"));
+      Assert.assertTrue(rows.get(0).containsKey("arr4"));
+      if (NullHandling.sqlCompatible()) {
+        Assert.assertArrayEquals(new Object[]{"a", null, "", "a", "abc", 
null}, (Object[]) rows.get(0).get("arr1"));
+        Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, null, null, 
null}, (Object[]) rows.get(0).get("arr2"));
+        Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, null, null, 
null}, (Object[]) rows.get(0).get("arr3"));
+        Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, null, null, 
null}, (Object[]) rows.get(0).get("arr4"));
+      } else {
+        Assert.assertArrayEquals(new Object[]{"a", null, null, "a", "abc", 
null}, (Object[]) rows.get(0).get("arr1"));
+        Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, 0L, 0L, 0L}, 
(Object[]) rows.get(0).get("arr2"));
+        Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, 0.0, 0.0, 0.0}, 
(Object[]) rows.get(0).get("arr3"));
+        Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, 0.0f, 0.0f, 
0.0f}, (Object[]) rows.get(0).get("arr4"));
+      }
     }
   }
 
-  protected abstract String getJdbcConnectionString(int port);
+  // Default implementation is for JSON to allow debugging of tests.
+  protected String getJdbcConnectionString(final int port)
+  {
+    return StringUtils.format(
+            "jdbc:avatica:remote:url=http://127.0.0.1:%d%s";,
+            port,
+            DruidAvaticaJsonHandler.AVATICA_PATH
+    );
+  }
 
-  protected abstract AbstractAvaticaHandler getAvaticaHandler(DruidMeta 
druidMeta);
+  // Default implementation is for JSON to allow debugging of tests.
+  protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
+  {
+    return new DruidAvaticaJsonHandler(
+            druidMeta,
+            new DruidNode("dummy", "dummy", false, 1, null, true, false),
+            new AvaticaMonitor()
+    );
+  }
 
   private static List<Map<String, Object>> getRows(final ResultSet resultSet) 
throws SQLException
   {
@@ -1437,6 +1571,7 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
     }
   }
 
+  @SafeVarargs
   private static Map<String, Object> row(final Pair<String, ?>... entries)
   {
     final Map<String, Object> m = new HashMap<>();
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java
 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java
deleted file mode 100644
index 1e60905bfb..0000000000
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.avatica;
-
-import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.server.DruidNode;
-
-public class DruidAvaticaJsonHandlerTest extends DruidAvaticaHandlerTest
-{
-  @Override
-  protected String getJdbcConnectionString(final int port)
-  {
-    return StringUtils.format(
-            "jdbc:avatica:remote:url=http://127.0.0.1:%d%s";,
-            port,
-            DruidAvaticaJsonHandler.AVATICA_PATH
-    );
-  }
-
-  @Override
-  protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
-  {
-    return new DruidAvaticaJsonHandler(
-            druidMeta,
-            new DruidNode("dummy", "dummy", false, 1, null, true, false),
-            new AvaticaMonitor()
-    );
-  }
-}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index c6eeb83391..fba413d990 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -20,20 +20,23 @@
 package org.apache.druid.sql.avatica;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.remote.TypedValue;
+import org.apache.calcite.tools.RelConversionException;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.security.AllowAllAuthenticator;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -58,6 +61,15 @@ import java.util.List;
 
 public class DruidStatementTest extends CalciteTestBase
 {
+  private static String SUB_QUERY_WITH_ORDER_BY =
+        "select T20.F13 as F22\n"
+      + "from (SELECT DISTINCT dim1 as F13 FROM druid.foo T10) T20\n"
+      + "order by T20.F13 ASC";
+  private static String SELECT_FROM_FOO =
+      "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
+  private static String SELECT_STAR_FROM_FOO =
+      "SELECT * FROM druid.foo";
+
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -82,6 +94,7 @@ public class DruidStatementTest extends CalciteTestBase
 
   private SpecificSegmentsQuerySegmentWalker walker;
   private SqlLifecycleFactory sqlLifecycleFactory;
+  private DruidConnection conn;
 
   @Before
   public void setUp() throws Exception
@@ -103,89 +116,200 @@ public class DruidStatementTest extends CalciteTestBase
         CalciteTests.DRUID_SCHEMA_NAME,
         new CalciteRulesManager(ImmutableSet.of())
     );
-    this.sqlLifecycleFactory = 
CalciteTests.createSqlLifecycleFactory(plannerFactory);
+    sqlLifecycleFactory = 
CalciteTests.createSqlLifecycleFactory(plannerFactory);
+    conn = new DruidConnection("dummy", 4, ImmutableMap.of(), 
ImmutableMap.of());
   }
 
   @After
   public void tearDown() throws Exception
   {
+    conn.close();
     walker.close();
     walker = null;
   }
 
+  //-----------------------------------------------------------------
+  // Druid JDBC Statement
+  //
+  // The JDBC Statement class starts "empty", then allows executing
+  // one statement at a time. Executing a second automatically closes
+  // the result set from the first. Each statement takes a new query.
+  // Parameters are not generally used in this pattern.
+
+  private DruidJdbcStatement jdbcStatement()
+  {
+    return new DruidJdbcStatement(
+        conn,
+        0,
+        sqlLifecycleFactory
+    );
+  }
+
   @Test
-  public void testSignature()
+  public void testSubQueryWithOrderByDirect() throws RelConversionException
   {
-    final String sql = "SELECT * FROM druid.foo";
-    try (final DruidStatement statement = statement(sql)) {
-      // Check signature.
-      final Meta.Signature signature = statement.getSignature();
-      Assert.assertEquals(Meta.CursorFactory.ARRAY, signature.cursorFactory);
-      Assert.assertEquals(Meta.StatementType.SELECT, signature.statementType);
-      Assert.assertEquals(sql, signature.sql);
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SUB_QUERY_WITH_ORDER_BY,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+      // First frame, ask for all rows.
+      statement.execute(queryPlus, -1);
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
       Assert.assertEquals(
-          Lists.newArrayList(
-              Lists.newArrayList("__time", "TIMESTAMP", "java.lang.Long"),
-              Lists.newArrayList("dim1", "VARCHAR", "java.lang.String"),
-              Lists.newArrayList("dim2", "VARCHAR", "java.lang.String"),
-              Lists.newArrayList("dim3", "VARCHAR", "java.lang.String"),
-              Lists.newArrayList("cnt", "BIGINT", "java.lang.Number"),
-              Lists.newArrayList("m1", "FLOAT", "java.lang.Float"),
-              Lists.newArrayList("m2", "DOUBLE", "java.lang.Double"),
-              Lists.newArrayList("unique_dim1", "OTHER", "java.lang.Object")
-          ),
-          Lists.transform(
-              signature.columns,
-              new Function<ColumnMetaData, List<String>>()
-              {
-                @Override
-                public List<String> apply(final ColumnMetaData columnMetaData)
-                {
-                  return Lists.newArrayList(
-                      columnMetaData.label,
-                      columnMetaData.type.name,
-                      columnMetaData.type.rep.clazz.getName()
-                  );
-                }
-              }
-          )
+          subQueryWithOrderByResults(),
+          frame
       );
+      Assert.assertTrue(statement.isDone());
     }
   }
 
   @Test
-  public void testSubQueryWithOrderBy()
+  public void testFetchPastEOFDirect() throws RelConversionException
   {
-    final String sql = "select T20.F13 as F22  from (SELECT DISTINCT dim1 as 
F13 FROM druid.foo T10) T20 order by T20.F13 ASC";
-    try (final DruidStatement statement = statement(sql)) {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SUB_QUERY_WITH_ORDER_BY,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
       // First frame, ask for all rows.
-      Meta.Frame frame = 
statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET,
 6);
+      statement.execute(queryPlus, -1);
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
       Assert.assertEquals(
-          Meta.Frame.create(
-              0,
-              true,
-              Lists.newArrayList(
-                  new Object[]{""},
-                  new Object[]{"1"},
-                  new Object[]{"10.1"},
-                  new Object[]{"2"},
-                  new Object[]{"abc"},
-                  new Object[]{"def"}
-              )
-          ),
+          subQueryWithOrderByResults(),
           frame
       );
       Assert.assertTrue(statement.isDone());
+      try {
+        statement.nextFrame(6, 6);
+        Assert.fail();
+      }
+      catch (Exception e) {
+        // Expected: can't work with an auto-closed result set.
+      }
     }
   }
 
+  /**
+   * Ensure an error is thrown if the execution step is skipped.
+   */
   @Test
-  public void testSelectAllInFirstFrame()
+  public void testSkipExecuteDirect()
   {
-    final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
-    try (final DruidStatement statement = statement(sql)) {
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+      // Error: no call to execute;
+      statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.fail();
+    }
+    catch (Exception e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testSignatureDirect() throws RelConversionException
+  {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SELECT_STAR_FROM_FOO,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+      // Check signature.
+      statement.execute(queryPlus, -1);
+      verifySignature(statement.getSignature());
+    }
+  }
+
+  /**
+   * Ensure an error is thrown if the client attempts to fetch from a
+   * statement after its result set is closed.
+   */
+  @Test
+  public void testFetchAfterResultCloseDirect()
+  {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SUB_QUERY_WITH_ORDER_BY,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
       // First frame, ask for all rows.
-      Meta.Frame frame = 
statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET,
 6);
+      statement.execute(queryPlus, -1);
+      statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      statement.closeResultSet();
+      statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.fail();
+    }
+    catch (Exception e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testSubQueryWithOrderByDirectTwice() throws 
RelConversionException
+  {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SUB_QUERY_WITH_ORDER_BY,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+      statement.execute(queryPlus, -1);
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          subQueryWithOrderByResults(),
+          frame
+      );
+
+      // Do it again. JDBC says we can reuse statements sequentially.
+      Assert.assertTrue(statement.isDone());
+      statement.execute(queryPlus, -1);
+      frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          subQueryWithOrderByResults(),
+          frame
+      );
+      Assert.assertTrue(statement.isDone());
+    }
+  }
+
+  private Meta.Frame subQueryWithOrderByResults()
+  {
+    return Meta.Frame.create(
+        0,
+        true,
+        Lists.newArrayList(
+            new Object[]{""},
+            new Object[]{"1"},
+            new Object[]{"10.1"},
+            new Object[]{"2"},
+            new Object[]{"abc"},
+            new Object[]{"def"}
+        )
+    );
+  }
+
+  @Test
+  public void testSelectAllInFirstFrameDirect() throws RelConversionException
+  {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SELECT_FROM_FOO,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+      // First frame, ask for all rows.
+      statement.execute(queryPlus, -1);
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
       Assert.assertEquals(
           Meta.Frame.create(
               0,
@@ -211,59 +335,309 @@ public class DruidStatementTest extends CalciteTestBase
     }
   }
 
+  /**
+   * Test results spread over two frames. Also checks various state-related
+   * methods.
+   * @throws RelConversionException
+   */
   @Test
-  public void testSelectSplitOverTwoFrames()
+  public void testSelectSplitOverTwoFramesDirect() throws 
RelConversionException
   {
-    final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
-    try (final DruidStatement statement = statement(sql)) {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SELECT_FROM_FOO,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+
       // First frame, ask for 2 rows.
-      Meta.Frame frame = 
statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET,
 2);
+      statement.execute(queryPlus, -1);
+      Assert.assertEquals(0, statement.getCurrentOffset());
+      Assert.assertFalse(statement.isDone());
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
       Assert.assertEquals(
-          Meta.Frame.create(
-              0,
-              false,
-              Lists.newArrayList(
-                  new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "", 
"a", 1.0f},
-                  new Object[]{
-                      DateTimes.of("2000-01-02").getMillis(),
-                      1L,
-                      "10.1",
-                      NullHandling.defaultStringValue(),
-                      2.0f
-                  }
-              )
-          ),
+          firstFrameResults(),
           frame
       );
       Assert.assertFalse(statement.isDone());
+      Assert.assertEquals(2, statement.getCurrentOffset());
 
       // Last frame, ask for all remaining rows.
       frame = statement.nextFrame(2, 10);
       Assert.assertEquals(
-          Meta.Frame.create(
-              2,
-              true,
-              Lists.newArrayList(
-                  new Object[]{DateTimes.of("2000-01-03").getMillis(), 1L, 
"2", "", 3.0f},
-                  new Object[]{DateTimes.of("2001-01-01").getMillis(), 1L, 
"1", "a", 4.0f},
-                  new Object[]{DateTimes.of("2001-01-02").getMillis(), 1L, 
"def", "abc", 5.0f},
-                  new Object[]{DateTimes.of("2001-01-03").getMillis(), 1L, 
"abc", NullHandling.defaultStringValue(), 6.0f}
-              )
-          ),
+          secondFrameResults(),
+          frame
+      );
+      Assert.assertTrue(statement.isDone());
+    }
+  }
+
+  /**
+   * Verify that JDBC automatically closes the first result set when we
+   * open a second for the same statement.
+   * @throws RelConversionException
+   */
+  @Test
+  public void testTwoFramesAutoCloseDirect() throws RelConversionException
+  {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SELECT_FROM_FOO,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+      // First frame, ask for 2 rows.
+      statement.execute(queryPlus, -1);
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
+      Assert.assertEquals(
+          firstFrameResults(),
+          frame
+      );
+      Assert.assertFalse(statement.isDone());
+
+      // Do it again. Closes the prior result set.
+      statement.execute(queryPlus, -1);
+      frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
+      Assert.assertEquals(
+          firstFrameResults(),
+          frame
+      );
+      Assert.assertFalse(statement.isDone());
+
+      // Last frame, ask for all remaining rows.
+      frame = statement.nextFrame(2, 10);
+      Assert.assertEquals(
+          secondFrameResults(),
           frame
       );
       Assert.assertTrue(statement.isDone());
     }
   }
 
-  private DruidStatement statement(String sql)
+  /**
+   * Test that closing a statement with pending results automatically
+   * closes the underlying result set.
+   * @throws RelConversionException
+   */
+  @Test
+  public void testTwoFramesCloseWithResultSetDirect() throws 
RelConversionException
   {
-    return new DruidStatement(
-        "",
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SELECT_FROM_FOO,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcStatement statement = jdbcStatement()) {
+      // First frame, ask for 2 rows.
+      statement.execute(queryPlus, -1);
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
+      Assert.assertEquals(
+          firstFrameResults(),
+          frame
+      );
+      Assert.assertFalse(statement.isDone());
+
+      // Leave result set open; close statement.
+    }
+  }
+
+  private Meta.Frame firstFrameResults()
+  {
+    return Meta.Frame.create(
+        0,
+        false,
+        Lists.newArrayList(
+            new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "", "a", 
1.0f},
+            new Object[]{
+                DateTimes.of("2000-01-02").getMillis(),
+                1L,
+                "10.1",
+                NullHandling.defaultStringValue(),
+                2.0f
+            }
+        )
+    );
+  }
+
+  private Meta.Frame secondFrameResults()
+  {
+    return Meta.Frame.create(
+        2,
+        true,
+        Lists.newArrayList(
+            new Object[]{DateTimes.of("2000-01-03").getMillis(), 1L, "2", "", 
3.0f},
+            new Object[]{DateTimes.of("2001-01-01").getMillis(), 1L, "1", "a", 
4.0f},
+            new Object[]{DateTimes.of("2001-01-02").getMillis(), 1L, "def", 
"abc", 5.0f},
+            new Object[]{DateTimes.of("2001-01-03").getMillis(), 1L, "abc", 
NullHandling.defaultStringValue(), 6.0f}
+        )
+    );
+  }
+
+  @SuppressWarnings("unchecked")
+  private void verifySignature(Meta.Signature signature)
+  {
+    Assert.assertEquals(Meta.CursorFactory.ARRAY, signature.cursorFactory);
+    Assert.assertEquals(Meta.StatementType.SELECT, signature.statementType);
+    Assert.assertEquals(SELECT_STAR_FROM_FOO, signature.sql);
+    Assert.assertEquals(
+        Lists.newArrayList(
+            Lists.newArrayList("__time", "TIMESTAMP", "java.lang.Long"),
+            Lists.newArrayList("dim1", "VARCHAR", "java.lang.String"),
+            Lists.newArrayList("dim2", "VARCHAR", "java.lang.String"),
+            Lists.newArrayList("dim3", "VARCHAR", "java.lang.String"),
+            Lists.newArrayList("cnt", "BIGINT", "java.lang.Number"),
+            Lists.newArrayList("m1", "FLOAT", "java.lang.Float"),
+            Lists.newArrayList("m2", "DOUBLE", "java.lang.Double"),
+            Lists.newArrayList("unique_dim1", "OTHER", "java.lang.Object")
+        ),
+        Lists.transform(
+            signature.columns,
+            new Function<ColumnMetaData, List<String>>()
+            {
+              @Override
+              public List<String> apply(final ColumnMetaData columnMetaData)
+              {
+                return Lists.newArrayList(
+                    columnMetaData.label,
+                    columnMetaData.type.name,
+                    columnMetaData.type.rep.clazz.getName()
+                );
+              }
+            }
+        )
+    );
+  }
+
+  //-----------------------------------------------------------------
+  // Druid JDBC Prepared Statement
+  //
+  // The JDBC PreparedStatement class starts with, then allows executing
+  // the statement sequentially, typically with a set of parameters.
+
+  private DruidJdbcPreparedStatement jdbcPreparedStatement(SqlQueryPlus 
queryPlus)
+  {
+    return new DruidJdbcPreparedStatement(
+        conn,
         0,
-        new QueryContext(),
-        sqlLifecycleFactory.factorize(),
-        () -> {}
-    ).prepare(sql, -1, AllowAllAuthenticator.ALLOW_ALL_RESULT);
+        queryPlus,
+        sqlLifecycleFactory,
+        Long.MAX_VALUE
+    );
+  }
+
+  @Test
+  public void testSubQueryWithOrderByPrepared()
+  {
+    final String sql = "select T20.F13 as F22  from (SELECT DISTINCT dim1 as 
F13 FROM druid.foo T10) T20 order by T20.F13 ASC";
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        sql,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcPreparedStatement statement = 
jdbcPreparedStatement(queryPlus)) {
+      statement.prepare();
+      // First frame, ask for all rows.
+      statement.execute(Collections.emptyList());
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          subQueryWithOrderByResults(),
+          frame
+      );
+      Assert.assertTrue(statement.isDone());
+    }
+  }
+
+  @Test
+  public void testSubQueryWithOrderByPreparedTwice()
+  {
+    final String sql = "select T20.F13 as F22  from (SELECT DISTINCT dim1 as 
F13 FROM druid.foo T10) T20 order by T20.F13 ASC";
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        sql,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcPreparedStatement statement = 
jdbcPreparedStatement(queryPlus)) {
+      statement.prepare();
+      statement.execute(Collections.emptyList());
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          subQueryWithOrderByResults(),
+          frame
+      );
+
+      // Do it again. JDBC says we can reuse prepared statements sequentially.
+      Assert.assertTrue(statement.isDone());
+      statement.execute(Collections.emptyList());
+      frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          subQueryWithOrderByResults(),
+          frame
+      );
+      Assert.assertTrue(statement.isDone());
+    }
+  }
+
+  @Test
+  public void testSignaturePrepared()
+  {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        SELECT_STAR_FROM_FOO,
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    try (final DruidJdbcPreparedStatement statement = 
jdbcPreparedStatement(queryPlus)) {
+      statement.prepare();
+      verifySignature(statement.getSignature());
+    }
+  }
+
+  @Test
+  public void testParameters()
+  {
+    SqlQueryPlus queryPlus = new SqlQueryPlus(
+        "SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?",
+        null,
+        null,
+        AllowAllAuthenticator.ALLOW_ALL_RESULT
+    );
+    Meta.Frame expected = Meta.Frame.create(0, true, 
Collections.singletonList(new Object[] {1L}));
+    List<TypedValue> matchingParams = 
Collections.singletonList(TypedValue.ofLocal(ColumnMetaData.Rep.STRING, 
"dummy"));
+    try (final DruidJdbcPreparedStatement statement = 
jdbcPreparedStatement(queryPlus)) {
+
+      // PreparedStatement protocol: prepare once...
+      statement.prepare();
+
+      // Execute many times. First time.
+      statement.execute(matchingParams);
+      Meta.Frame frame = 
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          expected,
+          frame
+      );
+
+      // Again, same value.
+      statement.execute(matchingParams);
+      frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          expected,
+          frame
+      );
+
+      // Again, no matches.
+      statement.execute(
+          Collections.singletonList(
+              TypedValue.ofLocal(ColumnMetaData.Rep.STRING, "foo")));
+      frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+      Assert.assertEquals(
+          Meta.Frame.create(0, true, Collections.emptyList()),
+          frame
+      );
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to