This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a8b155e9c6 Fixes for the Avatica JDBC driver (#12709)
a8b155e9c6 is described below
commit a8b155e9c6e4699d30ba2386c6ec88a3004eb819
Author: Paul Rogers <[email protected]>
AuthorDate: Wed Jul 27 15:22:40 2022 -0700
Fixes for the Avatica JDBC driver (#12709)
* Fixes for the Avatica JDBC driver
Correctly implement regular and prepared statements
Correctly implement result sets
Fix race condition with contexts
Clarify when parameters are used
Prepare for single-pass through the planner
* Addressed review comments
* Addressed review comment
---
.../security/ITBasicAuthConfigurationTest.java | 2 +-
.../security/ITBasicAuthLdapConfigurationTest.java | 2 +-
.../java/org/apache/druid/sql/SqlLifecycle.java | 2 +-
.../java/org/apache/druid/sql/SqlQueryPlus.java | 128 +++++
.../sql/avatica/AbstractDruidJdbcStatement.java | 257 +++++++++
.../apache/druid/sql/avatica/DruidConnection.java | 130 ++++-
.../sql/avatica/DruidJdbcPreparedStatement.java | 160 ++++++
.../druid/sql/avatica/DruidJdbcResultSet.java | 250 ++++++++
.../druid/sql/avatica/DruidJdbcStatement.java | 79 +++
.../org/apache/druid/sql/avatica/DruidMeta.java | 188 +++---
.../apache/druid/sql/avatica/DruidStatement.java | 449 ---------------
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 635 +++++++++++++--------
.../sql/avatica/DruidAvaticaJsonHandlerTest.java | 47 --
.../druid/sql/avatica/DruidStatementTest.java | 554 +++++++++++++++---
14 files changed, 1925 insertions(+), 958 deletions(-)
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
index 562ca66de0..7acf915954 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
@@ -48,7 +48,7 @@ public class ITBasicAuthConfigurationTest extends
AbstractAuthConfigurationTest
private static final String BASIC_AUTHORIZER = "basic";
private static final String EXPECTED_AVATICA_AUTH_ERROR = "Error while
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver
error: QueryInterruptedException: User metadata store authentication failed. ->
BasicSecurityAuthenticationException: User metadata store authentication
failed.";
- private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver
error: RuntimeException: org.apache.druid.server.security.ForbiddenException:
Allowed:false, Message: -> ForbiddenException: Allowed:false, Message:";
+ private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver
error: ForbiddenException: Allowed:false, Message:";
private HttpClient druid99;
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
index a3e7291bb2..f174cdf8fa 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java
@@ -54,7 +54,7 @@ public class ITBasicAuthLdapConfigurationTest extends
AbstractAuthConfigurationT
private static final String LDAP_AUTHORIZER = "ldapauth";
private static final String EXPECTED_AVATICA_AUTH_ERROR = "Error while
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver
error: QueryInterruptedException: User LDAP authentication failed. ->
BasicSecurityAuthenticationException: User LDAP authentication failed.";
- private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver
error: RuntimeException: org.apache.druid.server.security.ForbiddenException:
Allowed:false, Message: -> ForbiddenException: Allowed:false, Message:";
+ private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while
executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver
error: ForbiddenException: Allowed:false, Message:";
@Inject
IntegrationTestingConfig config;
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index 9faca5c600..002fd3045a 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -170,7 +170,7 @@ public class SqlLifecycle
}
/**
- * Assign dynamic parameters to be used to substitute values during query
exection. This can be performed at any
+ * Assign dynamic parameters to be used to substitute values during query
execution. This can be performed at any
* part of the lifecycle.
*/
public void setParameters(List<TypedValue> parameters)
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java
b/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java
new file mode 100644
index 0000000000..bebf74b1a3
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.avatica.remote.TypedValue;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.http.SqlParameter;
+import org.apache.druid.sql.http.SqlQuery;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Captures the inputs to a SQL execution request: the statement,
+ * the context, parameters, and the authorization result. Pass this
+ * around rather than the quad of items. The request can evolve:
+ * items can be filled in later as needed (except for the SQL
+ * and auth result, which are required.)
+ */
+public class SqlQueryPlus
+{
+ private final String sql;
+ private final QueryContext queryContext;
+ private final List<TypedValue> parameters;
+ private final AuthenticationResult authResult;
+
+ public SqlQueryPlus(
+ String sql,
+ QueryContext queryContext,
+ List<TypedValue> parameters,
+ AuthenticationResult authResult
+ )
+ {
+ this.sql = Preconditions.checkNotNull(sql);
+ this.queryContext = queryContext == null
+ ? new QueryContext()
+ : queryContext;
+ this.parameters = parameters == null
+ ? Collections.emptyList()
+ : parameters;
+ this.authResult = Preconditions.checkNotNull(authResult);
+ }
+
+ public SqlQueryPlus(final String sql, final AuthenticationResult authResult)
+ {
+ this(sql, (QueryContext) null, null, authResult);
+ }
+
+ public static SqlQueryPlus fromSqlParameters(
+ String sql,
+ Map<String, Object> queryContext,
+ List<SqlParameter> parameters,
+ AuthenticationResult authResult
+ )
+ {
+ return new SqlQueryPlus(
+ sql,
+ queryContext == null ? null : new QueryContext(queryContext),
+ parameters == null ? null : SqlQuery.getParameterList(parameters),
+ authResult
+ );
+ }
+
+ public static SqlQueryPlus from(
+ String sql,
+ Map<String, Object> queryContext,
+ List<TypedValue> parameters,
+ AuthenticationResult authResult
+ )
+ {
+ return new SqlQueryPlus(
+ sql,
+ queryContext == null ? null : new QueryContext(queryContext),
+ parameters,
+ authResult
+ );
+ }
+
+ public String sql()
+ {
+ return sql;
+ }
+
+ public QueryContext context()
+ {
+ return queryContext;
+ }
+
+ public List<TypedValue> parameters()
+ {
+ return parameters;
+ }
+
+ public AuthenticationResult authResult()
+ {
+ return authResult;
+ }
+
+ public SqlQueryPlus withContext(QueryContext context)
+ {
+ return new SqlQueryPlus(sql, context, parameters, authResult);
+ }
+
+ public SqlQueryPlus withParameters(List<TypedValue> parameters)
+ {
+ return new SqlQueryPlus(sql, queryContext, parameters, authResult);
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
new file mode 100644
index 0000000000..399ecf673f
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PrepareResult;
+
+import java.io.Closeable;
+import java.sql.Array;
+import java.sql.DatabaseMetaData;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Common implementation for the JDBC {@code Statement} and
+ * {@code PreparedStatement} implementations in Druid. Statement use
+ * {@link DruidJdbcResultSet} objects to iterate through rows: zero
+ * or one may be open at any time, and a single statement supports
+ * multiple result sets concurrently. Druid closes the result set after
+ * the last batch in compliance with this note on page 137 of the
+ * <a
href="https://download.oracle.com/otn-pub/jcp/jdbc-4_1-mrel-spec/jdbc4.1-fr-spec.pdf?AuthParam=1655858260_d6388aeec9aeb8e5616d221749b1ff71">
+ * JDBC 4.1 specification</a>:
+ * <p>
+ * <i>Some JDBC driver implementations may also implicitly close the
+ * ResultSet when the ResultSet type is TYPE_FORWARD_ONLY and the next
+ * method of ResultSet returns false.</i>
+ */
+public abstract class AbstractDruidJdbcStatement implements Closeable
+{
+ public static final long START_OFFSET = 0;
+
+ protected final DruidConnection connection;
+ protected final int statementId;
+ protected DruidJdbcResultSet resultSet;
+
+ public AbstractDruidJdbcStatement(
+ final DruidConnection connection,
+ final int statementId
+ )
+ {
+ this.connection = Preconditions.checkNotNull(connection, "connection");
+ this.statementId = statementId;
+ }
+
+ protected static Meta.Signature createSignature(PrepareResult prepareResult,
String sql)
+ {
+ List<AvaticaParameter> params = new ArrayList<>();
+ final RelDataType parameterRowType = prepareResult.getParameterRowType();
+ for (RelDataTypeField field : parameterRowType.getFieldList()) {
+ RelDataType type = field.getType();
+ params.add(createParameter(field, type));
+ }
+ return Meta.Signature.create(
+ createColumnMetaData(prepareResult.getRowType()),
+ sql,
+ params,
+ Meta.CursorFactory.ARRAY,
+ Meta.StatementType.SELECT // We only support SELECT
+ );
+ }
+
+ private static AvaticaParameter createParameter(RelDataTypeField field,
RelDataType type)
+ {
+ // signed is always false because no way to extract from RelDataType, and
the only usage of this AvaticaParameter
+ // constructor I can find, in CalcitePrepareImpl, does it this way with
hard coded false
+ return new AvaticaParameter(
+ false,
+ type.getPrecision(),
+ type.getScale(),
+ type.getSqlTypeName().getJdbcOrdinal(),
+ type.getSqlTypeName().getName(),
+ Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(),
+ field.getName()
+ );
+ }
+
+ public static List<ColumnMetaData> createColumnMetaData(final RelDataType
rowType)
+ {
+ final List<ColumnMetaData> columns = new ArrayList<>();
+ List<RelDataTypeField> fieldList = rowType.getFieldList();
+
+ for (int i = 0; i < fieldList.size(); i++) {
+ RelDataTypeField field = fieldList.get(i);
+
+ final ColumnMetaData.AvaticaType columnType;
+ if (field.getType().getSqlTypeName() == SqlTypeName.ARRAY) {
+ final ColumnMetaData.Rep elementRep =
rep(field.getType().getComponentType().getSqlTypeName());
+ final ColumnMetaData.ScalarType elementType = ColumnMetaData.scalar(
+
field.getType().getComponentType().getSqlTypeName().getJdbcOrdinal(),
+ field.getType().getComponentType().getSqlTypeName().getName(),
+ elementRep
+ );
+ final ColumnMetaData.Rep arrayRep =
rep(field.getType().getSqlTypeName());
+ columnType = ColumnMetaData.array(
+ elementType,
+ field.getType().getSqlTypeName().getName(),
+ arrayRep
+ );
+ } else {
+ final ColumnMetaData.Rep rep = rep(field.getType().getSqlTypeName());
+ columnType = ColumnMetaData.scalar(
+ field.getType().getSqlTypeName().getJdbcOrdinal(),
+ field.getType().getSqlTypeName().getName(),
+ rep
+ );
+ }
+ columns.add(
+ new ColumnMetaData(
+ i, // ordinal
+ false, // auto increment
+ true, // case sensitive
+ false, // searchable
+ false, // currency
+ field.getType().isNullable()
+ ? DatabaseMetaData.columnNullable
+ : DatabaseMetaData.columnNoNulls, // nullable
+ true, // signed
+ field.getType().getPrecision(), // display size
+ field.getName(), // label
+ null, // column name
+ null, // schema name
+ field.getType().getPrecision(), // precision
+ field.getType().getScale(), // scale
+ null, // table name
+ null, // catalog name
+ columnType, // avatica type
+ true, // read only
+ false, // writable
+ false, // definitely writable
+ columnType.columnClassName() // column class name
+ )
+ );
+ }
+
+ return columns;
+ }
+
+ private static ColumnMetaData.Rep rep(final SqlTypeName sqlType)
+ {
+ if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
+ return ColumnMetaData.Rep.of(String.class);
+ } else if (sqlType == SqlTypeName.TIMESTAMP) {
+ return ColumnMetaData.Rep.of(Long.class);
+ } else if (sqlType == SqlTypeName.DATE) {
+ return ColumnMetaData.Rep.of(Integer.class);
+ } else if (sqlType == SqlTypeName.INTEGER) {
+ // use Number.class for exact numeric types since JSON transport might
switch longs to integers
+ return ColumnMetaData.Rep.of(Number.class);
+ } else if (sqlType == SqlTypeName.BIGINT) {
+ // use Number.class for exact numeric types since JSON transport might
switch longs to integers
+ return ColumnMetaData.Rep.of(Number.class);
+ } else if (sqlType == SqlTypeName.FLOAT) {
+ return ColumnMetaData.Rep.of(Float.class);
+ } else if (sqlType == SqlTypeName.DOUBLE || sqlType ==
SqlTypeName.DECIMAL) {
+ return ColumnMetaData.Rep.of(Double.class);
+ } else if (sqlType == SqlTypeName.BOOLEAN) {
+ return ColumnMetaData.Rep.of(Boolean.class);
+ } else if (sqlType == SqlTypeName.OTHER) {
+ return ColumnMetaData.Rep.of(Object.class);
+ } else if (sqlType == SqlTypeName.ARRAY) {
+ return ColumnMetaData.Rep.of(Array.class);
+ } else {
+ throw new ISE("No rep for SQL type [%s]", sqlType);
+ }
+ }
+
+ public Meta.Frame nextFrame(final long fetchOffset, final int
fetchMaxRowCount)
+ {
+ Meta.Frame frame = requireResultSet().nextFrame(fetchOffset,
fetchMaxRowCount);
+
+ // Implicitly close after the last result frame.
+ if (frame.done) {
+ closeResultSet();
+ }
+ return frame;
+ }
+
+ public abstract Meta.Signature getSignature();
+
+ public void closeResultSet()
+ {
+ // Lock held only to get the result set, not during cleanup.
+ DruidJdbcResultSet currentResultSet;
+ synchronized (this) {
+ currentResultSet = resultSet;
+ resultSet = null;
+ }
+ if (currentResultSet != null) {
+ currentResultSet.close();
+ }
+ }
+
+ protected synchronized DruidJdbcResultSet requireResultSet()
+ {
+ if (resultSet == null) {
+ throw new ISE("No result set open for statement [%d]", statementId);
+ }
+ return resultSet;
+ }
+
+ public long getCurrentOffset()
+ {
+ return requireResultSet().getCurrentOffset();
+ }
+
+ public synchronized boolean isDone()
+ {
+ return resultSet == null ? true : resultSet.isDone();
+ }
+
+ @Override
+ public synchronized void close()
+ {
+ closeResultSet();
+ }
+
+ public String getConnectionId()
+ {
+ return connection.getConnectionId();
+ }
+
+ public int getStatementId()
+ {
+ return statementId;
+ }
+
+ public ExecutorService executor()
+ {
+ return connection.executor();
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
index 2cd277f1c5..ab6ae65a98 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
@@ -20,17 +20,22 @@
package org.apache.druid.sql.avatica;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.calcite.tools.RelConversionException;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContext;
import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -44,15 +49,14 @@ public class DruidConnection
private final String connectionId;
private final int maxStatements;
- private final ImmutableMap<String, Object> userSecret;
- private final QueryContext context;
+ private final Map<String, Object> userSecret;
+ private final Map<String, Object> context;
private final AtomicInteger statementCounter = new AtomicInteger();
private final AtomicReference<Future<?>> timeoutFuture = new
AtomicReference<>();
+ private final ExecutorService yielderOpenCloseExecutor;
- // Typically synchronized by connectionLock, except in one case: the onClose
function passed
- // into DruidStatements contained by the map.
@GuardedBy("connectionLock")
- private final ConcurrentMap<Integer, DruidStatement> statements;
+ private final ConcurrentMap<Integer, AbstractDruidJdbcStatement> statements
= new ConcurrentHashMap<>();
private final Object connectionLock = new Object();
@GuardedBy("connectionLock")
@@ -62,14 +66,19 @@ public class DruidConnection
final String connectionId,
final int maxStatements,
final Map<String, Object> userSecret,
- final QueryContext context
+ final Map<String, Object> context
)
{
this.connectionId = Preconditions.checkNotNull(connectionId);
this.maxStatements = maxStatements;
this.userSecret = ImmutableMap.copyOf(userSecret);
- this.context = context;
- this.statements = new ConcurrentHashMap<>();
+ this.context = Preconditions.checkNotNull(context);
+ this.yielderOpenCloseExecutor = Execs.singleThreaded(
+ StringUtils.format(
+ "JDBCYielderOpenCloseExecutor-connection-%s",
+ StringUtils.encodeForFormat(connectionId)
+ )
+ );
}
public String getConnectionId()
@@ -77,7 +86,16 @@ public class DruidConnection
return connectionId;
}
- public DruidStatement createStatement(SqlLifecycleFactory
sqlLifecycleFactory)
+ public QueryContext makeContext()
+ {
+ // QueryContext constructor copies the context parameters.
+ // we don't want to stringify arrays for JDBC ever because Avatica needs
to handle this
+ final QueryContext queryContext = new QueryContext(context);
+ queryContext.addSystemParam(PlannerContext.CTX_SQL_STRINGIFY_ARRAYS,
false);
+ return queryContext;
+ }
+
+ public DruidJdbcStatement createStatement(SqlLifecycleFactory
sqlLifecycleFactory)
{
final int statementId = statementCounter.incrementAndGet();
@@ -89,36 +107,85 @@ public class DruidConnection
}
if (statements.size() >= maxStatements) {
- throw DruidMeta.logFailure(new ISE("Too many open statements, limit
is[%,d]", maxStatements));
+ throw DruidMeta.logFailure(new ISE("Too many open statements, limit is
[%,d]", maxStatements));
}
@SuppressWarnings("GuardedBy")
- final DruidStatement statement = new DruidStatement(
- connectionId,
+ final DruidJdbcStatement statement = new DruidJdbcStatement(
+ this,
statementId,
- context,
- sqlLifecycleFactory.factorize(),
- () -> {
- // onClose function for the statement
- LOG.debug("Connection[%s] closed statement[%s].", connectionId,
statementId);
- // statements will be accessed unsynchronized to avoid deadlock
- statements.remove(statementId);
- }
+ sqlLifecycleFactory
);
statements.put(statementId, statement);
- LOG.debug("Connection[%s] opened statement[%s].", connectionId,
statementId);
+ LOG.debug("Connection [%s] opened statement [%s].", connectionId,
statementId);
return statement;
}
}
- public DruidStatement getStatement(final int statementId)
+ public DruidJdbcPreparedStatement createPreparedStatement(
+ SqlLifecycleFactory sqlLifecycleFactory,
+ SqlQueryPlus queryPlus,
+ final long maxRowCount)
+ {
+ final int statementId = statementCounter.incrementAndGet();
+
+ synchronized (connectionLock) {
+ if (statements.containsKey(statementId)) {
+ // Will only happen if statementCounter rolls over before old
statements are cleaned up. If this
+ // ever happens then something fishy is going on, because we shouldn't
have billions of statements.
+ throw DruidMeta.logFailure(new ISE("Uh oh, too many statements"));
+ }
+
+ if (statements.size() >= maxStatements) {
+ throw DruidMeta.logFailure(new ISE("Too many open statements, limit is
[%,d]", maxStatements));
+ }
+
+ @SuppressWarnings("GuardedBy")
+ final DruidJdbcPreparedStatement jdbcStmt = new
DruidJdbcPreparedStatement(
+ this,
+ statementId,
+ queryPlus,
+ sqlLifecycleFactory,
+ maxRowCount
+ );
+ jdbcStmt.prepare();
+
+ statements.put(statementId, jdbcStmt);
+ LOG.debug("Connection [%s] opened prepared statement [%s].",
connectionId, statementId);
+ return jdbcStmt;
+ }
+ }
+
+ public void prepareAndExecute(
+ final DruidJdbcStatement druidStatement,
+ final SqlQueryPlus queryPlus,
+ final long maxRowCount
+ ) throws RelConversionException
+ {
+ Preconditions.checkNotNull(context, "JDBC connection context is null!");
+ druidStatement.execute(queryPlus.withContext(makeContext()), maxRowCount);
+ }
+
+ public AbstractDruidJdbcStatement getStatement(final int statementId)
{
synchronized (connectionLock) {
return statements.get(statementId);
}
}
+ public void closeStatement(int statementId)
+ {
+ AbstractDruidJdbcStatement stmt;
+ synchronized (connectionLock) {
+ stmt = statements.remove(statementId);
+ }
+ if (stmt != null) {
+ stmt.close();
+ LOG.debug("Connection [%s] closed statement [%s].", connectionId,
statementId);
+ }
+ }
+
/**
* Closes this connection if it has no statements.
*
@@ -139,18 +206,18 @@ public class DruidConnection
public void close()
{
synchronized (connectionLock) {
- // Copy statements before iterating because statement.close() modifies
it.
- for (DruidStatement statement :
ImmutableList.copyOf(statements.values())) {
+ open = false;
+ for (AbstractDruidJdbcStatement statement : statements.values()) {
try {
statement.close();
}
catch (Exception e) {
- LOG.warn("Connection[%s] failed to close statement[%s]!",
connectionId, statement.getStatementId());
+ LOG.warn("Connection [%s] failed to close statement [%s]!",
connectionId, statement.getStatementId());
}
}
-
- LOG.debug("Connection[%s] closed.", connectionId);
- open = false;
+ statements.clear();
+ yielderOpenCloseExecutor.shutdownNow();
+ LOG.debug("Connection [%s] closed.", connectionId);
}
}
@@ -167,4 +234,9 @@ public class DruidConnection
{
return userSecret;
}
+
+ public ExecutorService executor()
+ {
+ return yielderOpenCloseExecutor;
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
new file mode 100644
index 0000000000..5f23b8ee4d
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.remote.TypedValue;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
+import org.apache.druid.sql.calcite.planner.PrepareResult;
+
+import java.util.List;
+
+/**
+ * The Druid implementation of the server-side representation of the
+ * JDBC {@code PreparedStatement} class. A prepared statement can be prepared
+ * once with a query, then executed any number of times, typically with
+ * parameter values. No parameters are provided during prepare, though we do
+ * learn the parameter definitions passed back to the client and used by
+ * Avatica for serialization. Each execution produces a
+ * {@link DruidJdbcResultSet}. Only one execution is active at a time.
+ */
+public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement
+{
+ private final SqlLifecycle sqlStatement;
+ private final SqlQueryPlus queryPlus;
+ private final SqlLifecycleFactory lifecycleFactory;
+ private final long maxRowCount;
+ private Meta.Signature signature;
+ private State state = State.NEW;
+
+ public DruidJdbcPreparedStatement(
+ final DruidConnection connection,
+ final int statementId,
+ final SqlQueryPlus queryPlus,
+ final SqlLifecycleFactory lifecycleFactory,
+ final long maxRowCount
+ )
+ {
+ super(connection, statementId);
+ this.lifecycleFactory = lifecycleFactory;
+ this.queryPlus = queryPlus;
+ this.maxRowCount = maxRowCount;
+ this.sqlStatement = lifecycleFactory.factorize();
+ sqlStatement.initialize(queryPlus.sql(), connection.makeContext());
+ }
+
+ public synchronized void prepare()
+ {
+ try {
+ ensure(State.NEW);
+ sqlStatement.validateAndAuthorize(queryPlus.authResult());
+ PrepareResult prepareResult = sqlStatement.prepare();
+ signature = createSignature(
+ prepareResult,
+ queryPlus.sql()
+ );
+ state = State.PREPARED;
+ }
+ catch (ForbiddenException e) {
+ // Can't finalize statement in in this case. Call will fail with an
+ // assertion error.
+ DruidMeta.logFailure(e);
+ state = State.CLOSED;
+ throw e;
+ }
+ catch (RuntimeException e) {
+ failed(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ failed(t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @Override
+ public synchronized Meta.Signature getSignature()
+ {
+ ensure(State.PREPARED);
+ return signature;
+ }
+
+ public synchronized void execute(List<TypedValue> parameters)
+ {
+ ensure(State.PREPARED);
+ closeResultSet();
+ try {
+ SqlLifecycle directStmt = lifecycleFactory.factorize();
+ directStmt.initialize(queryPlus.sql(), connection.makeContext());
+ directStmt.setParameters(parameters);
+ resultSet = new DruidJdbcResultSet(this, queryPlus, directStmt,
maxRowCount);
+ resultSet.execute();
+ }
+ // Failure to execute does not close the prepared statement.
+ catch (RuntimeException e) {
+ failed(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ failed(t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @GuardedBy("this")
+ private void ensure(final State... desiredStates)
+ {
+ for (State desiredState : desiredStates) {
+ if (state == desiredState) {
+ return;
+ }
+ }
+ throw new ISE("Invalid action for state [%s]", state);
+ }
+
+ private void failed(Throwable t)
+ {
+ super.close();
+ sqlStatement.finalizeStateAndEmitLogsAndMetrics(t, null, -1);
+ state = State.CLOSED;
+ }
+
+ @Override
+ public synchronized void close()
+ {
+ if (state != State.CLOSED) {
+ super.close();
+ sqlStatement.finalizeStateAndEmitLogsAndMetrics(null, null, -1);
+ }
+ state = State.CLOSED;
+ }
+
+ enum State
+ {
+ NEW,
+ PREPARED,
+ CLOSED
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
new file mode 100644
index 0000000000..d4c3eba1d0
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlQueryPlus;
+import org.apache.druid.sql.calcite.planner.PrepareResult;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Druid's server-side representation of a JDBC result set. At most one
+ * can be open per statement (standard or prepared). The implementation
+ * is based on Druid's {@link SqlLifecycle} class. Even if result
+ * set is for a {@code PreparedStatement}, the result set itself uses
+ * a Druid {@code SqlLifecycle} which includes the parameter values
+ * given for the execution. This allows Druid's planner to use the "query
+ * optimized" form of parameter substitution: we replan the query for
+ * each execution with the parameter values.
+ * <p>
+ * Avatica returns results in {@link Meta.Frame} objects as batches of
+ * rows. The result set uses the {@code TYPE_FORWARD_ONLY} execution model:
+ * the application can only read results sequentially, the application
+ * can't jump around or read backwards. As a result, the enclosing
+ * statement closes the result set at EOF to release resources early.
+ */
+public class DruidJdbcResultSet implements Closeable
+{
+ /**
+ * Query metrics can only be used within a single thread. Because results can
+ * be paginated into multiple JDBC frames (each frame being processed by a
+ * potentially different thread), the thread that closes the yielder
+ * (resulting in a QueryMetrics emit() call) may not be the same thread that
+ * created the yielder (which initializes DefaultQueryMetrics with the
current
+ * thread as the owner). Create and close the yielder with this single-thread
+ * executor to prevent this from happening.
+ * <p>
+ * The thread owner check in DefaultQueryMetrics is more aggressive than
+ * needed for this specific JDBC case, since the JDBC frames are processed
+ * sequentially. If the thread owner check is changed/loosened to permit this
+ * use case, we would not need to use this executor.
+ * <p>
+ * See discussion at:
+ * https://github.com/apache/druid/pull/4288
+ * https://github.com/apache/druid/pull/4415
+ */
+ private final AbstractDruidJdbcStatement jdbcStatement;
+ private final SqlQueryPlus sqlRequest;
+ private final SqlLifecycle stmt;
+ private final long maxRowCount;
+ private State state = State.NEW;
+ private Meta.Signature signature;
+ private Yielder<Object[]> yielder;
+ private int offset;
+
+ public DruidJdbcResultSet(
+ final AbstractDruidJdbcStatement jdbcStatement,
+ final SqlQueryPlus sqlRequest,
+ final SqlLifecycle stmt,
+ final long maxRowCount
+ )
+ {
+ this.jdbcStatement = jdbcStatement;
+ this.stmt = stmt;
+ this.sqlRequest = sqlRequest;
+ this.maxRowCount = maxRowCount;
+ }
+
+ public synchronized void execute() throws RelConversionException
+ {
+ ensure(State.NEW);
+ stmt.validateAndAuthorize(sqlRequest.authResult());
+ PrepareResult prepareResult = stmt.prepare();
+ stmt.plan();
+ signature = AbstractDruidJdbcStatement.createSignature(
+ prepareResult,
+ sqlRequest.sql()
+ );
+ try {
+ state = State.RUNNING;
+ final Sequence<Object[]> baseSequence =
jdbcStatement.executor().submit(stmt::execute).get();
+
+ // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
+ final Sequence<Object[]> retSequence =
+ maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE
+ ? baseSequence.limit((int) maxRowCount)
+ : baseSequence;
+
+ yielder = Yielders.each(retSequence);
+ }
+ catch (Throwable t) {
+ throw closeAndPropagateThrowable(t);
+ }
+ }
+
+ public synchronized boolean isDone()
+ {
+ return state == State.DONE;
+ }
+
+ public synchronized Meta.Signature getSignature()
+ {
+ ensure(State.RUNNING, State.DONE);
+ return signature;
+ }
+
+ public synchronized Meta.Frame nextFrame(final long fetchOffset, final int
fetchMaxRowCount)
+ {
+ ensure(State.RUNNING, State.DONE);
+ Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] !=
offset [%,d]", fetchOffset, offset);
+ if (state == State.DONE) {
+ return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+ }
+
+ try {
+ final List<Object> rows = new ArrayList<>();
+ while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset <
fetchOffset + fetchMaxRowCount)) {
+ rows.add(yielder.get());
+ yielder = yielder.next(null);
+ offset++;
+ }
+
+ if (yielder.isDone()) {
+ state = State.DONE;
+ }
+
+ return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+ }
+ catch (Throwable t) {
+ throw closeAndPropagateThrowable(t);
+ }
+ }
+
+ public synchronized long getCurrentOffset()
+ {
+ ensure(State.RUNNING, State.DONE);
+ return offset;
+ }
+
+ @GuardedBy("this")
+ private void ensure(final State... desiredStates)
+ {
+ for (State desiredState : desiredStates) {
+ if (state == desiredState) {
+ return;
+ }
+ }
+ throw new ISE("Invalid action for state [%s]", state);
+ }
+
+ private RuntimeException closeAndPropagateThrowable(Throwable t)
+ {
+ DruidMeta.logFailure(t);
+ // Report a failure so that the failure is logged.
+ try {
+ close(t);
+ }
+ catch (Throwable t1) {
+ t.addSuppressed(t1);
+ }
+ finally {
+ state = State.FAILED;
+ }
+
+ // Avoid unnecessary wrapping.
+ if (t instanceof RuntimeException) {
+ return (RuntimeException) t;
+ }
+ return new RuntimeException(t);
+ }
+
+ @Override
+ public synchronized void close()
+ {
+ close(null);
+ }
+
+ private void close(Throwable error)
+ {
+ if (state == State.NEW) {
+ state = State.CLOSED;
+ }
+ if (state == State.CLOSED || state == State.FAILED) {
+ return;
+ }
+ state = State.CLOSED;
+ try {
+ if (yielder != null) {
+ Yielder<Object[]> theYielder = this.yielder;
+ this.yielder = null;
+
+ // Put the close last, so any exceptions it throws are after we did
the other cleanup above.
+ jdbcStatement.executor().submit(
+ () -> {
+ theYielder.close();
+ // makes this a Callable instead of Runnable so we don't need to
catch exceptions inside the lambda
+ return null;
+ }
+ ).get();
+
+ }
+ }
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ finally {
+ stmt.finalizeStateAndEmitLogsAndMetrics(error, null, -1);
+ }
+ }
+
+ private enum State
+ {
+ NEW,
+ RUNNING,
+ DONE,
+ FAILED,
+ CLOSED
+ }
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
new file mode 100644
index 0000000000..ebe64f5bde
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.avatica;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
+
+/**
+ * Represents Druid's version of the JDBC {@code Statement} class:
+ * can be executed multiple times, one after another, producing a
+ * {@link DruidJdbcResultSet} for each execution.
+ */
+public class DruidJdbcStatement extends AbstractDruidJdbcStatement
+{
+ private final SqlLifecycleFactory lifecycleFactory;
+ protected boolean closed;
+
+ public DruidJdbcStatement(
+ final DruidConnection connection,
+ final int statementId,
+ final SqlLifecycleFactory lifecycleFactory
+ )
+ {
+ super(connection, statementId);
+ this.lifecycleFactory = Preconditions.checkNotNull(lifecycleFactory,
"lifecycleFactory");
+ }
+
+ public synchronized void execute(SqlQueryPlus sqlRequest, long maxRowCount)
throws RelConversionException
+ {
+ closeResultSet();
+ SqlLifecycle stmt = lifecycleFactory.factorize();
+ stmt.initialize(sqlRequest.sql(), connection.makeContext());
+ try {
+ stmt.validateAndAuthorize(sqlRequest.authResult());
+ resultSet = new DruidJdbcResultSet(this, sqlRequest, stmt,
Long.MAX_VALUE);
+ resultSet.execute();
+ }
+ catch (ForbiddenException e) {
+ // Can't finalize statement in in this case. Call will fail with an
+ // assertion error.
+ resultSet = null;
+ DruidMeta.logFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ stmt.finalizeStateAndEmitLogsAndMetrics(t, null, -1);
+ resultSet = null;
+ throw t;
+ }
+ }
+
+ @Override
+ public Meta.Signature getSignature()
+ {
+ return requireResultSet().getSignature();
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
index 781cae53da..d318bd4bf3 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
@@ -40,20 +40,19 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.QueryContext;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authenticator;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.calcite.planner.Calcites;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -108,7 +107,7 @@ public class DruidMeta extends MetaImpl
private final ErrorHandler errorHandler;
/**
- * Used to track logical connections.
+ * Tracks logical connections.
*/
private final ConcurrentMap<String, DruidConnection> connections = new
ConcurrentHashMap<>();
@@ -157,16 +156,13 @@ public class DruidMeta extends MetaImpl
}
}
}
- // we don't want to stringify arrays for JDBC ever because avatica needs
to handle this
- final QueryContext context = new QueryContext(contextMap);
- context.addSystemParam(PlannerContext.CTX_SQL_STRINGIFY_ARRAYS, false);
- openDruidConnection(ch.id, secret, context);
+ openDruidConnection(ch.id, secret, contextMap);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
- // we want to avoid sanitizing avatica specific exceptions as the
avatica code can rely on them to handle issues
+ // we want to avoid sanitizing Avatica specific exceptions as the
Avatica code can rely on them to handle issues
// differently
throw errorHandler.sanitize(t);
}
@@ -206,11 +202,16 @@ public class DruidMeta extends MetaImpl
}
}
+ /**
+ * Creates a new implementation of the one-pass JDBC {@code Statement}
+ * class. Corresponds to the JDBC {@code Connection.createStatement()}
+ * method.
+ */
@Override
public StatementHandle createStatement(final ConnectionHandle ch)
{
try {
- final DruidStatement druidStatement =
getDruidConnection(ch.id).createStatement(sqlLifecycleFactory);
+ final DruidJdbcStatement druidStatement =
getDruidConnection(ch.id).createStatement(sqlLifecycleFactory);
return new StatementHandle(ch.id, druidStatement.getStatementId(), null);
}
catch (NoSuchConnectionException e) {
@@ -221,6 +222,11 @@ public class DruidMeta extends MetaImpl
}
}
+ /**
+ * Creates a new implementation of the JDBC {@code PreparedStatement}
+ * class which allows preparing once, executing many times. Corresponds to
+ * the JDBC {@code Connection.prepareStatement()} call.
+ */
@Override
public StatementHandle prepare(
final ConnectionHandle ch,
@@ -229,26 +235,19 @@ public class DruidMeta extends MetaImpl
)
{
try {
- final StatementHandle statement = createStatement(ch);
- final DruidStatement druidStatement;
- try {
- druidStatement = getDruidStatement(statement);
- }
- catch (NoSuchStatementException e) {
- throw logFailure(new ISE(e, e.getMessage()));
- }
- final DruidConnection druidConnection =
getDruidConnection(statement.connectionId);
- AuthenticationResult authenticationResult =
authenticateConnection(druidConnection);
- if (authenticationResult == null) {
- throw logFailure(
- new ForbiddenException("Authentication failed."),
- "Authentication failed for statement[%s]",
- druidStatement.getStatementId()
- );
- }
- statement.signature = druidStatement.prepare(sql, maxRowCount,
authenticationResult).getSignature();
- LOG.debug("Successfully prepared statement[%s] for execution",
druidStatement.getStatementId());
- return statement;
+ final DruidConnection druidConnection = getDruidConnection(ch.id);
+ SqlQueryPlus sqlReq = new SqlQueryPlus(
+ sql,
+ null, // Context provided by connection
+ null, // No parameters in this path
+ doAuthenticate(druidConnection)
+ );
+ DruidJdbcPreparedStatement stmt =
druidConnection.createPreparedStatement(
+ sqlLifecycleFactory,
+ sqlReq,
+ maxRowCount);
+ LOG.debug("Successfully prepared statement [%s] for execution",
stmt.getStatementId());
+ return new StatementHandle(ch.id, stmt.getStatementId(),
stmt.getSignature());
}
catch (NoSuchConnectionException e) {
throw e;
@@ -258,6 +257,18 @@ public class DruidMeta extends MetaImpl
}
}
+ private AuthenticationResult doAuthenticate(final DruidConnection
druidConnection)
+ {
+ AuthenticationResult authenticationResult =
authenticateConnection(druidConnection);
+ if (authenticationResult == null) {
+ throw logFailure(
+ new ForbiddenException("Authentication failed."),
+ "Authentication failed for prepare"
+ );
+ }
+ return authenticationResult;
+ }
+
@Deprecated
@Override
public ExecuteResult prepareAndExecute(
@@ -271,6 +282,9 @@ public class DruidMeta extends MetaImpl
throw errorHandler.sanitize(new UOE("Deprecated"));
}
+ /**
+ * Prepares and executes a JDBC {@code Statement}
+ */
@Override
public ExecuteResult prepareAndExecute(
final StatementHandle statement,
@@ -280,39 +294,19 @@ public class DruidMeta extends MetaImpl
final PrepareCallback callback
) throws NoSuchStatementException
{
+
try {
// Ignore "callback", this class is designed for use with LocalService
which doesn't use it.
- final DruidStatement druidStatement = getDruidStatement(statement);
+ final DruidJdbcStatement druidStatement = getDruidStatement(statement,
DruidJdbcStatement.class);
final DruidConnection druidConnection =
getDruidConnection(statement.connectionId);
- AuthenticationResult authenticationResult =
authenticateConnection(druidConnection);
- if (authenticationResult == null) {
- throw logFailure(
- new ForbiddenException("Authentication failed."),
- "Authentication failed for statement[%s]",
- druidStatement.getStatementId()
- );
- }
- druidStatement.prepare(sql, maxRowCount, authenticationResult);
- final Frame firstFrame = druidStatement.execute(Collections.emptyList())
- .nextFrame(
- DruidStatement.START_OFFSET,
-
getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame)
- );
- final Signature signature = druidStatement.getSignature();
- LOG.debug("Successfully prepared statement[%s] and started execution",
druidStatement.getStatementId());
- return new ExecuteResult(
- ImmutableList.of(
- MetaResultSet.create(
- statement.connectionId,
- statement.id,
- false,
- signature,
- firstFrame
- )
- )
- );
- }
- // cannot affect these exceptions as avatica handles them
+ // No parameters for a "regular" JDBC statement.
+ SqlQueryPlus sqlRequest = new SqlQueryPlus(sql, null, null,
doAuthenticate(druidConnection));
+ druidConnection.prepareAndExecute(druidStatement, sqlRequest,
maxRowCount);
+ ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
+ LOG.debug("Successfully prepared statement [%s] and started execution",
druidStatement.getStatementId());
+ return result;
+ }
+ // Cannot affect these exceptions as Avatica handles them.
catch (NoSuchConnectionException | NoSuchStatementException e) {
throw e;
}
@@ -321,6 +315,27 @@ public class DruidMeta extends MetaImpl
}
}
+ private ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int
maxRows)
+ {
+ final Signature signature = druidStatement.getSignature();
+ final Frame firstFrame = druidStatement.nextFrame(
+ AbstractDruidJdbcStatement.START_OFFSET,
+ getEffectiveMaxRowsPerFrame(maxRows)
+ );
+
+ return new ExecuteResult(
+ ImmutableList.of(
+ MetaResultSet.create(
+ druidStatement.getConnectionId(),
+ druidStatement.statementId,
+ false,
+ signature,
+ firstFrame
+ )
+ )
+ );
+ }
+
@Override
public ExecuteBatchResult prepareAndExecuteBatch(
final StatementHandle statement,
@@ -351,7 +366,7 @@ public class DruidMeta extends MetaImpl
try {
final int maxRows = getEffectiveMaxRowsPerFrame(fetchMaxRowCount);
LOG.debug("Fetching next frame from offset[%s] with [%s] rows for
statement[%s]", offset, maxRows, statement.id);
- return getDruidStatement(statement).nextFrame(offset, maxRows);
+ return getDruidStatement(statement,
AbstractDruidJdbcStatement.class).nextFrame(offset, maxRows);
}
catch (NoSuchConnectionException e) {
throw e;
@@ -381,26 +396,14 @@ public class DruidMeta extends MetaImpl
) throws NoSuchStatementException
{
try {
- final DruidStatement druidStatement = getDruidStatement(statement);
- final Frame firstFrame = druidStatement.execute(parameterValues)
- .nextFrame(
- DruidStatement.START_OFFSET,
-
getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame)
- );
-
- final Signature signature = druidStatement.getSignature();
- LOG.debug("Successfully started execution of statement[%s]",
druidStatement.getStatementId());
- return new ExecuteResult(
- ImmutableList.of(
- MetaResultSet.create(
- statement.connectionId,
- statement.id,
- false,
- signature,
- firstFrame
- )
- )
- );
+ final DruidJdbcPreparedStatement druidStatement =
+ getDruidStatement(statement, DruidJdbcPreparedStatement.class);
+ druidStatement.execute(parameterValues);
+ ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
+ LOG.debug(
+ "Successfully started execution of statement[%s]",
+ druidStatement.getStatementId());
+ return result;
}
catch (NoSuchStatementException | NoSuchConnectionException e) {
throw e;
@@ -430,10 +433,7 @@ public class DruidMeta extends MetaImpl
// connections.get, not getDruidConnection, since we want to silently
ignore nonexistent statements
final DruidConnection druidConnection = connections.get(h.connectionId);
if (druidConnection != null) {
- final DruidStatement druidStatement =
druidConnection.getStatement(h.id);
- if (druidStatement != null) {
- druidStatement.close();
- }
+ druidConnection.closeStatement(h.id);
}
}
catch (NoSuchConnectionException e) {
@@ -452,7 +452,7 @@ public class DruidMeta extends MetaImpl
) throws NoSuchStatementException
{
try {
- final DruidStatement druidStatement = getDruidStatement(sh);
+ final AbstractDruidJdbcStatement druidStatement = getDruidStatement(sh,
AbstractDruidJdbcStatement.class);
final boolean isDone = druidStatement.isDone();
final long currentOffset = druidStatement.getCurrentOffset();
if (currentOffset != offset) {
@@ -729,7 +729,7 @@ public class DruidMeta extends MetaImpl
private DruidConnection openDruidConnection(
final String connectionId,
final Map<String, Object> userSecret,
- final QueryContext context
+ final Map<String, Object> context
)
{
if (connectionCount.incrementAndGet() > config.getMaxConnections()) {
@@ -804,14 +804,22 @@ public class DruidMeta extends MetaImpl
}
@Nonnull
- private DruidStatement getDruidStatement(final StatementHandle statement)
throws NoSuchStatementException
+ private <T extends AbstractDruidJdbcStatement> T getDruidStatement(
+ final StatementHandle statement,
+ final Class<T> stmtClass
+ ) throws NoSuchStatementException
{
final DruidConnection connection =
getDruidConnection(statement.connectionId);
- final DruidStatement druidStatement =
connection.getStatement(statement.id);
+ final AbstractDruidJdbcStatement druidStatement =
connection.getStatement(statement.id);
if (druidStatement == null) {
throw logFailure(new NoSuchStatementException(statement));
}
- return druidStatement;
+ try {
+ return stmtClass.cast(druidStatement);
+ }
+ catch (ClassCastException e) {
+ throw logFailure(new NoSuchStatementException(statement));
+ }
}
private MetaResultSet sqlResultSet(final ConnectionHandle ch, final String
sql)
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
deleted file mode 100644
index b3c7e41284..0000000000
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.avatica;
-
-import com.google.common.base.Preconditions;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.calcite.avatica.AvaticaParameter;
-import org.apache.calcite.avatica.ColumnMetaData;
-import org.apache.calcite.avatica.Meta;
-import org.apache.calcite.avatica.remote.TypedValue;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
-import org.apache.druid.query.QueryContext;
-import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.server.security.ForbiddenException;
-import org.apache.druid.sql.SqlLifecycle;
-import org.apache.druid.sql.calcite.planner.Calcites;
-import org.apache.druid.sql.calcite.planner.PrepareResult;
-
-import java.io.Closeable;
-import java.sql.Array;
-import java.sql.DatabaseMetaData;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Statement handle for {@link DruidMeta}. Thread-safe.
- */
-public class DruidStatement implements Closeable
-{
- public static final long START_OFFSET = 0;
- private final String connectionId;
- private final int statementId;
- private final QueryContext queryContext;
- @GuardedBy("lock")
- private final SqlLifecycle sqlLifecycle;
- private final Runnable onClose;
- private final Object lock = new Object();
- /**
- * Query metrics can only be used within a single thread. Because results
can be paginated into multiple
- * JDBC frames (each frame being processed by a potentially different
thread), the thread that closes the yielder
- * (resulting in a QueryMetrics emit() call) may not be the same thread that
created the yielder (which initializes
- * DefaultQueryMetrics with the current thread as the owner). Create and
close the yielder with this
- * single-thread executor to prevent this from happening.
- * <p>
- * The thread owner check in DefaultQueryMetrics is more aggressive than
needed for this specific JDBC case, since
- * the JDBC frames are processed sequentially. If the thread owner check is
changed/loosened to permit this use case,
- * we would not need to use this executor.
- * <p>
- * See discussion at:
- * https://github.com/apache/druid/pull/4288
- * https://github.com/apache/druid/pull/4415
- */
- private final ExecutorService yielderOpenCloseExecutor;
- private State state = State.NEW;
- private String query;
- private long maxRowCount;
- private Meta.Signature signature;
- private Yielder<Object[]> yielder;
- private int offset = 0;
- private Throwable throwable;
- private AuthenticationResult authenticationResult;
-
- public DruidStatement(
- final String connectionId,
- final int statementId,
- final QueryContext queryContext,
- final SqlLifecycle sqlLifecycle,
- final Runnable onClose
- )
- {
- this.connectionId = Preconditions.checkNotNull(connectionId,
"connectionId");
- this.statementId = statementId;
- this.queryContext = queryContext;
- this.sqlLifecycle = Preconditions.checkNotNull(sqlLifecycle,
"sqlLifecycle");
- this.onClose = Preconditions.checkNotNull(onClose, "onClose");
- this.yielderOpenCloseExecutor = Execs.singleThreaded(
- StringUtils.format(
- "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
- StringUtils.encodeForFormat(connectionId),
- statementId
- )
- );
- }
-
- public static List<ColumnMetaData> createColumnMetaData(final RelDataType
rowType)
- {
- final List<ColumnMetaData> columns = new ArrayList<>();
- List<RelDataTypeField> fieldList = rowType.getFieldList();
-
- for (int i = 0; i < fieldList.size(); i++) {
- RelDataTypeField field = fieldList.get(i);
-
- final ColumnMetaData.AvaticaType columnType;
- if (field.getType().getSqlTypeName() == SqlTypeName.ARRAY) {
- final ColumnMetaData.Rep elementRep =
rep(field.getType().getComponentType().getSqlTypeName());
- final ColumnMetaData.ScalarType elementType = ColumnMetaData.scalar(
-
field.getType().getComponentType().getSqlTypeName().getJdbcOrdinal(),
- field.getType().getComponentType().getSqlTypeName().getName(),
- elementRep
- );
- final ColumnMetaData.Rep arrayRep =
rep(field.getType().getSqlTypeName());
- columnType = ColumnMetaData.array(
- elementType,
- field.getType().getSqlTypeName().getName(),
- arrayRep
- );
- } else {
- final ColumnMetaData.Rep rep = rep(field.getType().getSqlTypeName());
- columnType = ColumnMetaData.scalar(
- field.getType().getSqlTypeName().getJdbcOrdinal(),
- field.getType().getSqlTypeName().getName(),
- rep
- );
- }
- columns.add(
- new ColumnMetaData(
- i, // ordinal
- false, // auto increment
- true, // case sensitive
- false, // searchable
- false, // currency
- field.getType().isNullable()
- ? DatabaseMetaData.columnNullable
- : DatabaseMetaData.columnNoNulls, // nullable
- true, // signed
- field.getType().getPrecision(), // display size
- field.getName(), // label
- null, // column name
- null, // schema name
- field.getType().getPrecision(), // precision
- field.getType().getScale(), // scale
- null, // table name
- null, // catalog name
- columnType, // avatica type
- true, // read only
- false, // writable
- false, // definitely writable
- columnType.columnClassName() // column class name
- )
- );
- }
-
- return columns;
- }
-
- public DruidStatement prepare(
- final String query,
- final long maxRowCount,
- final AuthenticationResult authenticationResult
- )
- {
- synchronized (lock) {
- try {
- ensure(State.NEW);
- sqlLifecycle.initialize(query, queryContext);
- sqlLifecycle.validateAndAuthorize(authenticationResult);
- this.authenticationResult = authenticationResult;
- PrepareResult prepareResult = sqlLifecycle.prepare();
- this.maxRowCount = maxRowCount;
- this.query = query;
- List<AvaticaParameter> params = new ArrayList<>();
- final RelDataType parameterRowType =
prepareResult.getParameterRowType();
- for (RelDataTypeField field : parameterRowType.getFieldList()) {
- RelDataType type = field.getType();
- params.add(createParameter(field, type));
- }
- this.signature = Meta.Signature.create(
- createColumnMetaData(prepareResult.getRowType()),
- query,
- params,
- Meta.CursorFactory.ARRAY,
- Meta.StatementType.SELECT // We only support SELECT
- );
- this.state = State.PREPARED;
- }
- catch (Throwable t) {
- return closeAndPropagateThrowable(t);
- }
-
- return this;
- }
- }
-
-
- public DruidStatement execute(List<TypedValue> parameters)
- {
- synchronized (lock) {
- ensure(State.PREPARED);
- try {
- sqlLifecycle.setParameters(parameters);
- sqlLifecycle.validateAndAuthorize(authenticationResult);
- sqlLifecycle.plan();
- final Sequence<Object[]> baseSequence =
yielderOpenCloseExecutor.submit(sqlLifecycle::execute).get();
-
- // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
- final Sequence<Object[]> retSequence =
- maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE
- ? baseSequence.limit((int) maxRowCount)
- : baseSequence;
-
- yielder = Yielders.each(retSequence);
- state = State.RUNNING;
- }
- catch (Throwable t) {
- closeAndPropagateThrowable(t);
- }
-
- return this;
- }
- }
-
- public String getConnectionId()
- {
- return connectionId;
- }
-
- public int getStatementId()
- {
- return statementId;
- }
-
- public String getQuery()
- {
- synchronized (lock) {
- ensure(State.PREPARED, State.RUNNING, State.DONE);
- return query;
- }
- }
-
- public Meta.Signature getSignature()
- {
- synchronized (lock) {
- ensure(State.PREPARED, State.RUNNING, State.DONE);
- return signature;
- }
- }
-
- public long getCurrentOffset()
- {
- synchronized (lock) {
- ensure(State.RUNNING, State.DONE);
- return offset;
- }
- }
-
- public boolean isDone()
- {
- synchronized (lock) {
- return state == State.DONE;
- }
- }
-
- public Meta.Frame nextFrame(final long fetchOffset, final int
fetchMaxRowCount)
- {
- synchronized (lock) {
- ensure(State.RUNNING);
- Preconditions.checkState(fetchOffset == offset, "fetchOffset[%,d] !=
offset[%,d]", fetchOffset, offset);
-
- try {
- final List<Object> rows = new ArrayList<>();
- while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset <
fetchOffset + fetchMaxRowCount)) {
- rows.add(yielder.get());
- yielder = yielder.next(null);
- offset++;
- }
-
- final boolean done = yielder.isDone();
- if (done) {
- close();
- }
-
- return new Meta.Frame(fetchOffset, done, rows);
- }
- catch (Throwable t) {
- this.throwable = t;
- try {
- close();
- }
- catch (Throwable t1) {
- t.addSuppressed(t1);
- }
- throw t;
- }
- }
- }
-
- @Override
- public void close()
- {
- State oldState = null;
- try {
- synchronized (lock) {
- oldState = state;
- state = State.DONE;
- if (yielder != null) {
- Yielder<Object[]> theYielder = this.yielder;
- this.yielder = null;
-
- // Put the close last, so any exceptions it throws are after we did
the other cleanup above.
- yielderOpenCloseExecutor.submit(
- () -> {
- theYielder.close();
- // makes this a Callable instead of Runnable so we don't need
to catch exceptions inside the lambda
- return null;
- }
- ).get();
-
- yielderOpenCloseExecutor.shutdownNow();
- }
- }
- }
- catch (Throwable t) {
- if (oldState != State.DONE) {
- // First close. Run the onClose function.
- try {
- onClose.run();
- synchronized (lock) {
- sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(t, null, -1);
- }
- }
- catch (Throwable t1) {
- t.addSuppressed(t1);
- }
- }
-
- throw new RuntimeException(t);
- }
-
- if (oldState != State.DONE) {
- // First close. Run the onClose function.
- try {
- if (!(this.throwable instanceof ForbiddenException)) {
- synchronized (lock) {
- sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(this.throwable,
null, -1);
- }
- } else {
- DruidMeta.logFailure(this.throwable);
- }
- onClose.run();
- }
- catch (Throwable t) {
- throw new RuntimeException(t);
- }
- }
- }
-
- private AvaticaParameter createParameter(RelDataTypeField field, RelDataType
type)
- {
- // signed is always false because no way to extract from RelDataType, and
the only usage of this AvaticaParameter
- // constructor I can find, in CalcitePrepareImpl, does it this way with
hard coded false
- return new AvaticaParameter(
- false,
- type.getPrecision(),
- type.getScale(),
- type.getSqlTypeName().getJdbcOrdinal(),
- type.getSqlTypeName().getName(),
- Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(),
- field.getName()
- );
- }
-
-
- private DruidStatement closeAndPropagateThrowable(Throwable t)
- {
- this.throwable = t;
- DruidMeta.logFailure(t);
- try {
- close();
- }
- catch (Throwable t1) {
- t.addSuppressed(t1);
- }
- throw new RuntimeException(t);
- }
-
- @GuardedBy("lock")
- private void ensure(final State... desiredStates)
- {
- for (State desiredState : desiredStates) {
- if (state == desiredState) {
- return;
- }
- }
- throw new ISE("Invalid action for state[%s]", state);
- }
-
- private static ColumnMetaData.Rep rep(final SqlTypeName sqlType)
- {
- if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
- return ColumnMetaData.Rep.of(String.class);
- } else if (sqlType == SqlTypeName.TIMESTAMP) {
- return ColumnMetaData.Rep.of(Long.class);
- } else if (sqlType == SqlTypeName.DATE) {
- return ColumnMetaData.Rep.of(Integer.class);
- } else if (sqlType == SqlTypeName.INTEGER) {
- // use Number.class for exact numeric types since JSON transport might
switch longs to integers
- return ColumnMetaData.Rep.of(Number.class);
- } else if (sqlType == SqlTypeName.BIGINT) {
- // use Number.class for exact numeric types since JSON transport might
switch longs to integers
- return ColumnMetaData.Rep.of(Number.class);
- } else if (sqlType == SqlTypeName.FLOAT) {
- return ColumnMetaData.Rep.of(Float.class);
- } else if (sqlType == SqlTypeName.DOUBLE || sqlType ==
SqlTypeName.DECIMAL) {
- return ColumnMetaData.Rep.of(Double.class);
- } else if (sqlType == SqlTypeName.BOOLEAN) {
- return ColumnMetaData.Rep.of(Boolean.class);
- } else if (sqlType == SqlTypeName.OTHER) {
- return ColumnMetaData.Rep.of(Object.class);
- } else if (sqlType == SqlTypeName.ARRAY) {
- return ColumnMetaData.Rep.of(Array.class);
- } else {
- throw new ISE("No rep for SQL type[%s]", sqlType);
- }
- }
-
- enum State
- {
- NEW,
- PREPARED,
- RUNNING,
- DONE
- }
-}
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 0e7128b069..81167d919f 100644
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -54,6 +54,7 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QuerySchedulerProvider;
@@ -109,22 +110,29 @@ import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
-public abstract class DruidAvaticaHandlerTest extends CalciteTestBase
+/**
+ * Tests the Avatica-based JDBC implementation using JSON serialization. See
+ * {@link DruidAvaticaProtobufHandlerTest} for a subclass which runs
+ * this same set of tests using Protobuf serialization.
+ */
+public class DruidAvaticaHandlerTest extends CalciteTestBase
{
private static final AvaticaServerConfig AVATICA_CONFIG = new
AvaticaServerConfig()
{
@Override
public int getMaxConnections()
{
- // This must match the number of Connection objects created in setUp()
+ // This must match the number of Connection objects created in
testTooManyStatements()
return 4;
}
@@ -146,6 +154,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
{
resourceCloser = Closer.create();
conglomerate =
QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
+ System.setProperty("user.timezone", "UTC");
}
@AfterClass
@@ -260,139 +269,155 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testSelectCount() throws Exception
+ public void testSelectCount() throws SQLException
{
- final ResultSet resultSet = client.createStatement().executeQuery("SELECT
COUNT(*) AS cnt FROM druid.foo");
- final List<Map<String, Object>> rows = getRows(resultSet);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 6L)
- ),
- rows
- );
+ try (Statement stmt = client.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery("SELECT COUNT(*) AS cnt
FROM druid.foo");
+ final List<Map<String, Object>> rows = getRows(resultSet);
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 6L)
+ ),
+ rows
+ );
+ }
}
@Test
- public void testSelectCountNoTrailingSlash() throws Exception
+ public void testSelectCountNoTrailingSlash() throws SQLException
{
- final ResultSet resultSet =
clientNoTrailingSlash.createStatement().executeQuery("SELECT COUNT(*) AS cnt
FROM druid.foo");
- final List<Map<String, Object>> rows = getRows(resultSet);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 6L)
- ),
- rows
- );
+ try (Statement stmt = clientNoTrailingSlash.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery("SELECT COUNT(*) AS cnt
FROM druid.foo");
+ final List<Map<String, Object>> rows = getRows(resultSet);
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 6L)
+ ),
+ rows
+ );
+ }
}
@Test
- public void testSelectCountAlternateStyle() throws Exception
+ public void testSelectCountAlternateStyle() throws SQLException
{
- final ResultSet resultSet = client.prepareStatement("SELECT COUNT(*) AS
cnt FROM druid.foo").executeQuery();
- final List<Map<String, Object>> rows = getRows(resultSet);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 6L)
- ),
- rows
- );
+ try (PreparedStatement stmt = client.prepareStatement("SELECT COUNT(*) AS
cnt FROM druid.foo")) {
+ final ResultSet resultSet = stmt.executeQuery();
+ final List<Map<String, Object>> rows = getRows(resultSet);
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 6L)
+ ),
+ rows
+ );
+ }
}
@Test
- public void testTimestampsInResponse() throws Exception
+ public void testTimestampsInResponse() throws SQLException
{
- final ResultSet resultSet = client.createStatement().executeQuery(
- "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
- );
+ try (Statement stmt = client.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery(
+ "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
+ );
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of(
- "__time", new
Timestamp(DateTimes.of("2000-01-01T00:00:00.000Z").getMillis()),
- "t2", new Date(DateTimes.of("2000-01-01").getMillis())
- )
- ),
- getRows(resultSet)
- );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of(
+ "__time", new
Timestamp(DateTimes.of("2000-01-01T00:00:00.000Z").getMillis()),
+ "t2", new Date(DateTimes.of("2000-01-01").getMillis())
+ )
+ ),
+ getRows(resultSet)
+ );
+ }
}
@Test
- public void testTimestampsInResponseLosAngelesTimeZone() throws Exception
+ public void testTimestampsInResponseLosAngelesTimeZone() throws SQLException
{
- final ResultSet resultSet =
clientLosAngeles.createStatement().executeQuery(
- "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
- );
+ try (Statement stmt = clientLosAngeles.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery(
+ "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
+ );
- final DateTimeZone timeZone =
DateTimes.inferTzFromString("America/Los_Angeles");
- final DateTime localDateTime = new DateTime("2000-01-01T00Z", timeZone);
+ final DateTimeZone timeZone =
DateTimes.inferTzFromString("America/Los_Angeles");
+ final DateTime localDateTime = new DateTime("2000-01-01T00Z", timeZone);
- final List<Map<String, Object>> resultRows = getRows(resultSet);
+ final List<Map<String, Object>> resultRows = getRows(resultSet);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of(
- "__time", new
Timestamp(Calcites.jodaToCalciteTimestamp(localDateTime, timeZone)),
- "t2", new
Date(Calcites.jodaToCalciteTimestamp(localDateTime.dayOfMonth().roundFloorCopy(),
timeZone))
- )
- ),
- resultRows
- );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of(
+ "__time", new
Timestamp(Calcites.jodaToCalciteTimestamp(localDateTime, timeZone)),
+ "t2", new
Date(Calcites.jodaToCalciteTimestamp(localDateTime.dayOfMonth().roundFloorCopy(),
timeZone))
+ )
+ ),
+ resultRows
+ );
+ }
}
@Test
- public void testFieldAliasingSelect() throws Exception
+ public void testFieldAliasingSelect() throws SQLException
{
- final ResultSet resultSet = client.createStatement().executeQuery(
- "SELECT dim2 AS \"x\", dim2 AS \"y\" FROM druid.foo LIMIT 1"
- );
+ try (Statement stmt = client.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery(
+ "SELECT dim2 AS \"x\", dim2 AS \"y\" FROM druid.foo LIMIT 1"
+ );
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("x", "a", "y", "a")
- ),
- getRows(resultSet)
- );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("x", "a", "y", "a")
+ ),
+ getRows(resultSet)
+ );
+ }
}
@Test
- public void testSelectBoolean() throws Exception
+ public void testSelectBoolean() throws SQLException
{
- final ResultSet resultSet = client.createStatement().executeQuery(
- "SELECT dim2, dim2 IS NULL AS isnull FROM druid.foo LIMIT 1"
- );
+ try (Statement stmt = client.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery(
+ "SELECT dim2, dim2 IS NULL AS isnull FROM druid.foo LIMIT 1"
+ );
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("dim2", "a", "isnull", false)
- ),
- getRows(resultSet)
- );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("dim2", "a", "isnull", false)
+ ),
+ getRows(resultSet)
+ );
+ }
}
@Test
- public void testExplainSelectCount() throws Exception
+ public void testExplainSelectCount() throws SQLException
{
- final ResultSet resultSet =
clientLosAngeles.createStatement().executeQuery(
- "EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo"
- );
+ try (Statement stmt = clientLosAngeles.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery(
+ "EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo"
+ );
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of(
- "PLAN",
-
StringUtils.format("DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}}],
signature=[{a0:LONG}])\n",
- DUMMY_SQL_QUERY_ID
- ),
- "RESOURCES",
- "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"
- )
- ),
- getRows(resultSet)
- );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of(
+ "PLAN",
+
StringUtils.format("DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}}],
signature=[{a0:LONG}])\n",
+ DUMMY_SQL_QUERY_ID
+ ),
+ "RESOURCES",
+ "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"
+ )
+ ),
+ getRows(resultSet)
+ );
+ }
}
@Test
- public void testDatabaseMetaDataCatalogs() throws Exception
+ public void testDatabaseMetaDataCatalogs() throws SQLException
{
final DatabaseMetaData metaData = client.getMetaData();
Assert.assertEquals(
@@ -404,7 +429,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testDatabaseMetaDataSchemas() throws Exception
+ public void testDatabaseMetaDataSchemas() throws SQLException
{
final DatabaseMetaData metaData = client.getMetaData();
Assert.assertEquals(
@@ -416,7 +441,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testDatabaseMetaDataTables() throws Exception
+ public void testDatabaseMetaDataTables() throws SQLException
{
final DatabaseMetaData metaData = client.getMetaData();
Assert.assertEquals(
@@ -485,7 +510,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testDatabaseMetaDataTablesAsSuperuser() throws Exception
+ public void testDatabaseMetaDataTablesAsSuperuser() throws SQLException
{
final DatabaseMetaData metaData = superuserClient.getMetaData();
Assert.assertEquals(
@@ -559,7 +584,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testDatabaseMetaDataColumns() throws Exception
+ public void testDatabaseMetaDataColumns() throws SQLException
{
final DatabaseMetaData metaData = client.getMetaData();
Assert.assertEquals(
@@ -637,7 +662,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testDatabaseMetaDataColumnsOnForbiddenDatasource() throws
Exception
+ public void testDatabaseMetaDataColumnsOnForbiddenDatasource() throws
SQLException
{
final DatabaseMetaData metaData = client.getMetaData();
Assert.assertEquals(
@@ -650,7 +675,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testDatabaseMetaDataColumnsWithSuperuser() throws Exception
+ public void testDatabaseMetaDataColumnsWithSuperuser() throws SQLException
{
final DatabaseMetaData metaData = superuserClient.getMetaData();
Assert.assertEquals(
@@ -719,9 +744,8 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
);
}
-
@Test(timeout = 90_000L)
- public void testConcurrentQueries() throws Exception
+ public void testConcurrentQueries() throws InterruptedException,
ExecutionException
{
final List<ListenableFuture<Integer>> futures = new ArrayList<>();
final ListeningExecutorService exec = MoreExecutors.listeningDecorator(
@@ -749,23 +773,24 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
for (int i = 0; i < 2000; i++) {
Assert.assertEquals(i + 6, (int) integers.get(i));
}
+ exec.shutdown();
}
@Test
- public void testTooManyStatements() throws Exception
+ public void testTooManyStatements() throws SQLException
{
- final Statement statement1 = client.createStatement();
- final Statement statement2 = client.createStatement();
- final Statement statement3 = client.createStatement();
- final Statement statement4 = client.createStatement();
+ client.createStatement();
+ client.createStatement();
+ client.createStatement();
+ client.createStatement();
expectedException.expect(AvaticaClientRuntimeException.class);
- expectedException.expectMessage("Too many open statements, limit is[4]");
- final Statement statement5 = client.createStatement();
+ expectedException.expectMessage("Too many open statements, limit is [4]");
+ client.createStatement();
}
@Test
- public void testNotTooManyStatementsWhenYouCloseThem() throws Exception
+ public void testNotTooManyStatementsWhenYouCloseThem() throws SQLException
{
client.createStatement().close();
client.createStatement().close();
@@ -777,54 +802,75 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
client.createStatement().close();
client.createStatement().close();
client.createStatement().close();
-
- Assert.assertTrue(true);
}
+ /**
+ * JDBC allows sequential reuse of statements. A statement is not closed
until
+ * the application closes it (or the connection), but the statement's result
set
+ * is closed on each EOF.
+ */
@Test
- public void testNotTooManyStatementsWhenYouFullyIterateThem() throws
Exception
+ public void testManyUsesOfTheSameStatement() throws SQLException
{
- for (int i = 0; i < 50; i++) {
- final ResultSet resultSet = client.createStatement().executeQuery(
- "SELECT COUNT(*) AS cnt FROM druid.foo"
- );
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 6L)
- ),
- getRows(resultSet)
- );
+ try (Statement statement = client.createStatement()) {
+ for (int i = 0; i < 50; i++) {
+ final ResultSet resultSet = statement.executeQuery(
+ "SELECT COUNT(*) AS cnt FROM druid.foo"
+ );
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 6L)
+ ),
+ getRows(resultSet)
+ );
+ }
}
-
- Assert.assertTrue(true);
}
+ /**
+ * Statements should not be closed if then encounter an error. The {@code
ResultSet}
+ * can be closed, but not the statement.
+ */
@Test
- public void testNotTooManyStatementsWhenTheyThrowErrors() throws Exception
+ public void tesErrorsDoNotCloseStatements() throws SQLException
{
- for (int i = 0; i < 50; i++) {
- Exception thrown = null;
+ try (Statement statement = client.createStatement()) {
try {
- client.createStatement().executeQuery("SELECT SUM(nonexistent) FROM
druid.foo");
+ statement.executeQuery("SELECT SUM(nonexistent) FROM druid.foo");
+ Assert.fail();
}
catch (Exception e) {
- thrown = e;
+ // Expected
}
- Assert.assertNotNull(thrown);
-
- final ResultSet resultSet =
client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
+ final ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS
cnt FROM druid.foo");
Assert.assertEquals(
ImmutableList.of(ImmutableMap.of("cnt", 6L)),
getRows(resultSet)
);
}
+ }
- Assert.assertTrue(true);
+ /**
+ * Since errors do not close statements, they must be closed by the
application,
+ * preferably in a try-with-resources block.
+ */
+ @Test
+ public void testNotTooManyStatementsWhenClosed()
+ {
+ for (int i = 0; i < 50; i++) {
+ try (Statement statement = client.createStatement()) {
+ statement.executeQuery("SELECT SUM(nonexistent) FROM druid.foo");
+ Assert.fail();
+ }
+ catch (Exception e) {
+ // Expected
+ }
+ }
}
@Test
- public void testAutoReconnectOnNoSuchConnection() throws Exception
+ public void testAutoReconnectOnNoSuchConnection() throws SQLException
{
for (int i = 0; i < 50; i++) {
final ResultSet resultSet =
client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
@@ -834,12 +880,10 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
);
druidMeta.closeAllConnections();
}
-
- Assert.assertTrue(true);
}
@Test
- public void testTooManyConnections() throws Exception
+ public void testTooManyConnections() throws SQLException
{
client.createStatement();
clientLosAngeles.createStatement();
@@ -849,23 +893,16 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
expectedException.expect(AvaticaClientRuntimeException.class);
expectedException.expectMessage("Too many connections");
- final Connection connection5 = DriverManager.getConnection(url);
+ DriverManager.getConnection(url);
}
@Test
- public void testNotTooManyConnectionsWhenTheyAreEmpty() throws Exception
+ public void testNotTooManyConnectionsWhenTheyAreEmpty() throws SQLException
{
- final Connection connection1 = DriverManager.getConnection(url);
- connection1.createStatement().close();
-
- final Connection connection2 = DriverManager.getConnection(url);
- connection2.createStatement().close();
-
- final Connection connection3 = DriverManager.getConnection(url);
- connection3.createStatement().close();
-
- final Connection connection4 = DriverManager.getConnection(url);
- Assert.assertTrue(true);
+ for (int i = 0; i < 4; i++) {
+ try (Connection connection = DriverManager.getConnection(url)) {
+ }
+ }
}
@Test
@@ -957,7 +994,6 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
);
}
-
@Test
public void testMinRowsPerFrame() throws Exception
{
@@ -1031,7 +1067,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
String smallFrameUrl = this.getJdbcConnectionString(port);
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl,
"regularUser", "druid");
- // use a prepared statement because avatica currently ignores fetchSize on
the initial fetch of a Statement
+ // use a prepared statement because Avatica currently ignores fetchSize on
the initial fetch of a Statement
PreparedStatement statement = smallFrameClient.prepareStatement("SELECT
dim1 FROM druid.foo");
// set a fetch size below the minimum configured threshold
statement.setFetchSize(2);
@@ -1053,12 +1089,14 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- @SuppressWarnings("unchecked")
- public void testSqlRequestLog() throws Exception
+ public void testSqlRequestLog() throws SQLException
{
// valid sql
+ testRequestLogger.clear();
for (int i = 0; i < 3; i++) {
- client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM
druid.foo");
+ try (Statement stmt = client.createStatement()) {
+ stmt.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
+ }
}
Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size());
for (RequestLogLine logLine : testRequestLogger.getSqlQueryLogs()) {
@@ -1071,107 +1109,188 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
// invalid sql
testRequestLogger.clear();
- try {
- client.createStatement().executeQuery("SELECT notexist FROM druid.foo");
+ try (Statement stmt = client.createStatement()) {
+ stmt.executeQuery("SELECT notexist FROM druid.foo");
Assert.fail("invalid SQL should throw SQLException");
}
catch (SQLException e) {
+ // Expected
}
Assert.assertEquals(1, testRequestLogger.getSqlQueryLogs().size());
- final Map<String, Object> stats =
testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats();
- Assert.assertEquals(false, stats.get("success"));
- Assert.assertEquals("regularUser", stats.get("identity"));
- Assert.assertTrue(stats.containsKey("exception"));
+ {
+ final Map<String, Object> stats =
testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats();
+ Assert.assertEquals(false, stats.get("success"));
+ Assert.assertEquals("regularUser", stats.get("identity"));
+ Assert.assertTrue(stats.containsKey("exception"));
+ }
// unauthorized sql
testRequestLogger.clear();
- try {
- client.createStatement().executeQuery("SELECT count(*) FROM
druid.forbiddenDatasource");
+ try (Statement stmt = client.createStatement()) {
+ stmt.executeQuery("SELECT count(*) FROM druid.forbiddenDatasource");
Assert.fail("unauthorzed SQL should throw SQLException");
}
catch (SQLException e) {
+ // Expected
}
+ // SqlLifecycle does not allow logging for security failures.
Assert.assertEquals(0, testRequestLogger.getSqlQueryLogs().size());
}
@Test
- public void testParameterBinding() throws Exception
+ public void testSqlRequestLogPrepared() throws SQLException
{
- PreparedStatement statement = client.prepareStatement("SELECT COUNT(*) AS
cnt FROM druid.foo WHERE dim1 = ? OR dim1 = ?");
- statement.setString(1, "abc");
- statement.setString(2, "def");
- final ResultSet resultSet = statement.executeQuery();
- final List<Map<String, Object>> rows = getRows(resultSet);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 2L)
- ),
- rows
- );
+ // valid sql
+ testRequestLogger.clear();
+ for (int i = 0; i < 3; i++) {
+ try (PreparedStatement stmt = client.prepareStatement("SELECT COUNT(*)
AS cnt FROM druid.foo")) {
+ stmt.execute();
+ }
+ }
+ Assert.assertEquals(6, testRequestLogger.getSqlQueryLogs().size());
+ for (RequestLogLine logLine : testRequestLogger.getSqlQueryLogs()) {
+ final Map<String, Object> stats = logLine.getQueryStats().getStats();
+ Assert.assertEquals(true, stats.get("success"));
+ Assert.assertEquals("regularUser", stats.get("identity"));
+ Assert.assertTrue(stats.containsKey("sqlQuery/time"));
+ Assert.assertTrue(stats.containsKey("sqlQuery/bytes"));
+ }
+
+ // invalid sql
+ testRequestLogger.clear();
+ try (PreparedStatement stmt = client.prepareStatement("SELECT notexist
FROM druid.foo")) {
+ Assert.fail("invalid SQL should throw SQLException");
+ }
+ catch (SQLException e) {
+ // Expected
+ }
+ Assert.assertEquals(1, testRequestLogger.getSqlQueryLogs().size());
+ {
+ final Map<String, Object> stats =
testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats();
+ Assert.assertEquals(false, stats.get("success"));
+ Assert.assertEquals("regularUser", stats.get("identity"));
+ Assert.assertTrue(stats.containsKey("exception"));
+ }
+
+ // unauthorized sql
+ testRequestLogger.clear();
+ try (PreparedStatement stmt = client.prepareStatement("SELECT count(*)
FROM druid.forbiddenDatasource")) {
+ Assert.fail("unauthorzed SQL should throw SQLException");
+ }
+ catch (SQLException e) {
+ // Expected
+ }
+ // SqlLifecycle does not allow logging for security failures.
+ Assert.assertEquals(0, testRequestLogger.getSqlQueryLogs().size());
}
@Test
- public void testSysTableParameterBindingRegularUser() throws Exception
+ public void testParameterBinding() throws SQLException
{
- PreparedStatement statement =
- client.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE
servers.host = ?");
- statement.setString(1, "dummy");
-
- Assert.assertThrows(
- "Insufficient permission to view servers",
- AvaticaSqlException.class,
- statement::executeQuery
- );
+ try (PreparedStatement statement = client.prepareStatement("SELECT
COUNT(*) AS cnt FROM druid.foo WHERE dim1 = ? OR dim1 = ?")) {
+ statement.setString(1, "abc");
+ statement.setString(2, "def");
+ final ResultSet resultSet = statement.executeQuery();
+ final List<Map<String, Object>> rows = getRows(resultSet);
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 2L)
+ ),
+ rows
+ );
+ }
}
@Test
- public void testSysTableParameterBindingSuperUser() throws Exception
+ public void testSysTableParameterBindingRegularUser() throws SQLException
{
- PreparedStatement statement =
- superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM
sys.servers WHERE servers.host = ?");
- statement.setString(1, "dummy");
- final ResultSet resultSet = statement.executeQuery();
- final List<Map<String, Object>> rows = getRows(resultSet);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 1L)
- ),
- rows
- );
+ try (PreparedStatement statement =
+ client.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE
servers.host = ?")) {
+ statement.setString(1, "dummy");
+
+ Assert.assertThrows(
+ "Insufficient permission to view servers",
+ AvaticaSqlException.class,
+ statement::executeQuery
+ );
+ }
}
@Test
- public void testExtendedCharacters() throws Exception
+ public void testSysTableParameterBindingSuperUser() throws SQLException
{
- final ResultSet resultSet = client.createStatement().executeQuery(
- "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE
dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'"
- );
- final List<Map<String, Object>> rows = getRows(resultSet);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 1L)
- ),
- rows
- );
+ try (PreparedStatement statement =
+ superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM
sys.servers WHERE servers.host = ?")) {
+ statement.setString(1, "dummy");
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 1L)
+ ),
+ getRows(statement.executeQuery())
+ );
+ }
+ }
+ @Test
+ public void testExecuteMany() throws SQLException
+ {
+ try (PreparedStatement statement =
+ superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM
sys.servers WHERE servers.host = ?")) {
+ statement.setString(1, "dummy");
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 1L)
+ ),
+ getRows(statement.executeQuery())
+ );
+ statement.setString(1, "foo");
+ Assert.assertEquals(
+ Collections.emptyList(),
+ getRows(statement.executeQuery())
+ );
+ statement.setString(1, "dummy");
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 1L)
+ ),
+ getRows(statement.executeQuery())
+ );
+ }
+ }
- PreparedStatement statement = client.prepareStatement(
- "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE
dimMultivalEnumerated = ?"
- );
- statement.setString(1, "ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ");
- final ResultSet resultSet2 = statement.executeQuery();
- final List<Map<String, Object>> rows2 = getRows(resultSet2);
- Assert.assertEquals(
- ImmutableList.of(
- ImmutableMap.of("cnt", 1L)
- ),
- rows
- );
- Assert.assertEquals(rows, rows2);
+ @Test
+ public void testExtendedCharacters() throws SQLException
+ {
+ try (Statement stmt = client.createStatement()) {
+ final ResultSet resultSet = stmt.executeQuery(
+ "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE
dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'"
+ );
+ final List<Map<String, Object>> rows = getRows(resultSet);
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 1L)
+ ),
+ rows
+ );
+ }
+
+ try (PreparedStatement statement = client.prepareStatement(
+ "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE
dimMultivalEnumerated = ?")) {
+ statement.setString(1, "ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ");
+ final ResultSet resultSet2 = statement.executeQuery();
+ final List<Map<String, Object>> rows = getRows(resultSet2);
+ Assert.assertEquals(
+ ImmutableList.of(
+ ImmutableMap.of("cnt", 1L)
+ ),
+ rows
+ );
+ Assert.assertEquals(rows, rows);
+ }
}
@Test
- public void testEscapingForGetColumns() throws Exception
+ public void testEscapingForGetColumns() throws SQLException
{
final DatabaseMetaData metaData = client.getMetaData();
@@ -1325,7 +1444,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
@Test
- public void testEscapingForGetTables() throws Exception
+ public void testEscapingForGetTables() throws SQLException
{
final DatabaseMetaData metaData = client.getMetaData();
@@ -1374,36 +1493,51 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
);
}
-
@Test
- public void testArrayStuffs() throws Exception
+ public void testArrayStuff() throws SQLException
{
- PreparedStatement statement = client.prepareStatement(
- "SELECT ARRAY_AGG(dim2) AS arr1, ARRAY_AGG(l1) AS arr2, ARRAY_AGG(d1)
AS arr3, ARRAY_AGG(f1) AS arr4 FROM druid.numfoo"
- );
- final ResultSet resultSet = statement.executeQuery();
- final List<Map<String, Object>> rows = getRows(resultSet);
- Assert.assertEquals(1, rows.size());
- Assert.assertTrue(rows.get(0).containsKey("arr1"));
- Assert.assertTrue(rows.get(0).containsKey("arr2"));
- Assert.assertTrue(rows.get(0).containsKey("arr3"));
- Assert.assertTrue(rows.get(0).containsKey("arr4"));
- if (NullHandling.sqlCompatible()) {
- Assert.assertArrayEquals(new Object[]{"a", null, "", "a", "abc", null},
(Object[]) rows.get(0).get("arr1"));
- Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, null, null,
null}, (Object[]) rows.get(0).get("arr2"));
- Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, null, null, null},
(Object[]) rows.get(0).get("arr3"));
- Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, null, null,
null}, (Object[]) rows.get(0).get("arr4"));
- } else {
- Assert.assertArrayEquals(new Object[]{"a", null, null, "a", "abc",
null}, (Object[]) rows.get(0).get("arr1"));
- Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, 0L, 0L, 0L},
(Object[]) rows.get(0).get("arr2"));
- Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, 0.0, 0.0, 0.0},
(Object[]) rows.get(0).get("arr3"));
- Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, 0.0f, 0.0f,
0.0f}, (Object[]) rows.get(0).get("arr4"));
+ try (PreparedStatement statement = client.prepareStatement(
+ "SELECT ARRAY_AGG(dim2) AS arr1, ARRAY_AGG(l1) AS arr2, ARRAY_AGG(d1)
AS arr3, ARRAY_AGG(f1) AS arr4 FROM druid.numfoo")) {
+ final ResultSet resultSet = statement.executeQuery();
+ final List<Map<String, Object>> rows = getRows(resultSet);
+ Assert.assertEquals(1, rows.size());
+ Assert.assertTrue(rows.get(0).containsKey("arr1"));
+ Assert.assertTrue(rows.get(0).containsKey("arr2"));
+ Assert.assertTrue(rows.get(0).containsKey("arr3"));
+ Assert.assertTrue(rows.get(0).containsKey("arr4"));
+ if (NullHandling.sqlCompatible()) {
+ Assert.assertArrayEquals(new Object[]{"a", null, "", "a", "abc",
null}, (Object[]) rows.get(0).get("arr1"));
+ Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, null, null,
null}, (Object[]) rows.get(0).get("arr2"));
+ Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, null, null,
null}, (Object[]) rows.get(0).get("arr3"));
+ Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, null, null,
null}, (Object[]) rows.get(0).get("arr4"));
+ } else {
+ Assert.assertArrayEquals(new Object[]{"a", null, null, "a", "abc",
null}, (Object[]) rows.get(0).get("arr1"));
+ Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, 0L, 0L, 0L},
(Object[]) rows.get(0).get("arr2"));
+ Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, 0.0, 0.0, 0.0},
(Object[]) rows.get(0).get("arr3"));
+ Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, 0.0f, 0.0f,
0.0f}, (Object[]) rows.get(0).get("arr4"));
+ }
}
}
- protected abstract String getJdbcConnectionString(int port);
+ // Default implementation is for JSON to allow debugging of tests.
+ protected String getJdbcConnectionString(final int port)
+ {
+ return StringUtils.format(
+ "jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
+ port,
+ DruidAvaticaJsonHandler.AVATICA_PATH
+ );
+ }
- protected abstract AbstractAvaticaHandler getAvaticaHandler(DruidMeta
druidMeta);
+ // Default implementation is for JSON to allow debugging of tests.
+ protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
+ {
+ return new DruidAvaticaJsonHandler(
+ druidMeta,
+ new DruidNode("dummy", "dummy", false, 1, null, true, false),
+ new AvaticaMonitor()
+ );
+ }
private static List<Map<String, Object>> getRows(final ResultSet resultSet)
throws SQLException
{
@@ -1437,6 +1571,7 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
}
}
+ @SafeVarargs
private static Map<String, Object> row(final Pair<String, ?>... entries)
{
final Map<String, Object> m = new HashMap<>();
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java
deleted file mode 100644
index 1e60905bfb..0000000000
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.avatica;
-
-import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.server.DruidNode;
-
-public class DruidAvaticaJsonHandlerTest extends DruidAvaticaHandlerTest
-{
- @Override
- protected String getJdbcConnectionString(final int port)
- {
- return StringUtils.format(
- "jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
- port,
- DruidAvaticaJsonHandler.AVATICA_PATH
- );
- }
-
- @Override
- protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
- {
- return new DruidAvaticaJsonHandler(
- druidMeta,
- new DruidNode("dummy", "dummy", false, 1, null, true, false),
- new AvaticaMonitor()
- );
- }
-}
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index c6eeb83391..fba413d990 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -20,20 +20,23 @@
package org.apache.druid.sql.avatica;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.remote.TypedValue;
+import org.apache.calcite.tools.RelConversionException;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -58,6 +61,15 @@ import java.util.List;
public class DruidStatementTest extends CalciteTestBase
{
+ private static String SUB_QUERY_WITH_ORDER_BY =
+ "select T20.F13 as F22\n"
+ + "from (SELECT DISTINCT dim1 as F13 FROM druid.foo T10) T20\n"
+ + "order by T20.F13 ASC";
+ private static String SELECT_FROM_FOO =
+ "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
+ private static String SELECT_STAR_FROM_FOO =
+ "SELECT * FROM druid.foo";
+
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -82,6 +94,7 @@ public class DruidStatementTest extends CalciteTestBase
private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory;
+ private DruidConnection conn;
@Before
public void setUp() throws Exception
@@ -103,89 +116,200 @@ public class DruidStatementTest extends CalciteTestBase
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);
- this.sqlLifecycleFactory =
CalciteTests.createSqlLifecycleFactory(plannerFactory);
+ sqlLifecycleFactory =
CalciteTests.createSqlLifecycleFactory(plannerFactory);
+ conn = new DruidConnection("dummy", 4, ImmutableMap.of(),
ImmutableMap.of());
}
@After
public void tearDown() throws Exception
{
+ conn.close();
walker.close();
walker = null;
}
+ //-----------------------------------------------------------------
+ // Druid JDBC Statement
+ //
+ // The JDBC Statement class starts "empty", then allows executing
+ // one statement at a time. Executing a second automatically closes
+ // the result set from the first. Each statement takes a new query.
+ // Parameters are not generally used in this pattern.
+
+ private DruidJdbcStatement jdbcStatement()
+ {
+ return new DruidJdbcStatement(
+ conn,
+ 0,
+ sqlLifecycleFactory
+ );
+ }
+
@Test
- public void testSignature()
+ public void testSubQueryWithOrderByDirect() throws RelConversionException
{
- final String sql = "SELECT * FROM druid.foo";
- try (final DruidStatement statement = statement(sql)) {
- // Check signature.
- final Meta.Signature signature = statement.getSignature();
- Assert.assertEquals(Meta.CursorFactory.ARRAY, signature.cursorFactory);
- Assert.assertEquals(Meta.StatementType.SELECT, signature.statementType);
- Assert.assertEquals(sql, signature.sql);
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SUB_QUERY_WITH_ORDER_BY,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+ // First frame, ask for all rows.
+ statement.execute(queryPlus, -1);
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
Assert.assertEquals(
- Lists.newArrayList(
- Lists.newArrayList("__time", "TIMESTAMP", "java.lang.Long"),
- Lists.newArrayList("dim1", "VARCHAR", "java.lang.String"),
- Lists.newArrayList("dim2", "VARCHAR", "java.lang.String"),
- Lists.newArrayList("dim3", "VARCHAR", "java.lang.String"),
- Lists.newArrayList("cnt", "BIGINT", "java.lang.Number"),
- Lists.newArrayList("m1", "FLOAT", "java.lang.Float"),
- Lists.newArrayList("m2", "DOUBLE", "java.lang.Double"),
- Lists.newArrayList("unique_dim1", "OTHER", "java.lang.Object")
- ),
- Lists.transform(
- signature.columns,
- new Function<ColumnMetaData, List<String>>()
- {
- @Override
- public List<String> apply(final ColumnMetaData columnMetaData)
- {
- return Lists.newArrayList(
- columnMetaData.label,
- columnMetaData.type.name,
- columnMetaData.type.rep.clazz.getName()
- );
- }
- }
- )
+ subQueryWithOrderByResults(),
+ frame
);
+ Assert.assertTrue(statement.isDone());
}
}
@Test
- public void testSubQueryWithOrderBy()
+ public void testFetchPastEOFDirect() throws RelConversionException
{
- final String sql = "select T20.F13 as F22 from (SELECT DISTINCT dim1 as
F13 FROM druid.foo T10) T20 order by T20.F13 ASC";
- try (final DruidStatement statement = statement(sql)) {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SUB_QUERY_WITH_ORDER_BY,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
// First frame, ask for all rows.
- Meta.Frame frame =
statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET,
6);
+ statement.execute(queryPlus, -1);
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
Assert.assertEquals(
- Meta.Frame.create(
- 0,
- true,
- Lists.newArrayList(
- new Object[]{""},
- new Object[]{"1"},
- new Object[]{"10.1"},
- new Object[]{"2"},
- new Object[]{"abc"},
- new Object[]{"def"}
- )
- ),
+ subQueryWithOrderByResults(),
frame
);
Assert.assertTrue(statement.isDone());
+ try {
+ statement.nextFrame(6, 6);
+ Assert.fail();
+ }
+ catch (Exception e) {
+ // Expected: can't work with an auto-closed result set.
+ }
}
}
+ /**
+ * Ensure an error is thrown if the execution step is skipped.
+ */
@Test
- public void testSelectAllInFirstFrame()
+ public void testSkipExecuteDirect()
{
- final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
- try (final DruidStatement statement = statement(sql)) {
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+ // Error: no call to execute;
+ statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.fail();
+ }
+ catch (Exception e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testSignatureDirect() throws RelConversionException
+ {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SELECT_STAR_FROM_FOO,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+ // Check signature.
+ statement.execute(queryPlus, -1);
+ verifySignature(statement.getSignature());
+ }
+ }
+
+ /**
+ * Ensure an error is thrown if the client attempts to fetch from a
+ * statement after its result set is closed.
+ */
+ @Test
+ public void testFetchAfterResultCloseDirect()
+ {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SUB_QUERY_WITH_ORDER_BY,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
// First frame, ask for all rows.
- Meta.Frame frame =
statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET,
6);
+ statement.execute(queryPlus, -1);
+ statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ statement.closeResultSet();
+ statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.fail();
+ }
+ catch (Exception e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testSubQueryWithOrderByDirectTwice() throws
RelConversionException
+ {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SUB_QUERY_WITH_ORDER_BY,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+ statement.execute(queryPlus, -1);
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ subQueryWithOrderByResults(),
+ frame
+ );
+
+ // Do it again. JDBC says we can reuse statements sequentially.
+ Assert.assertTrue(statement.isDone());
+ statement.execute(queryPlus, -1);
+ frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ subQueryWithOrderByResults(),
+ frame
+ );
+ Assert.assertTrue(statement.isDone());
+ }
+ }
+
+ private Meta.Frame subQueryWithOrderByResults()
+ {
+ return Meta.Frame.create(
+ 0,
+ true,
+ Lists.newArrayList(
+ new Object[]{""},
+ new Object[]{"1"},
+ new Object[]{"10.1"},
+ new Object[]{"2"},
+ new Object[]{"abc"},
+ new Object[]{"def"}
+ )
+ );
+ }
+
+ @Test
+ public void testSelectAllInFirstFrameDirect() throws RelConversionException
+ {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SELECT_FROM_FOO,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+ // First frame, ask for all rows.
+ statement.execute(queryPlus, -1);
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
Assert.assertEquals(
Meta.Frame.create(
0,
@@ -211,59 +335,309 @@ public class DruidStatementTest extends CalciteTestBase
}
}
+ /**
+ * Test results spread over two frames. Also checks various state-related
+ * methods.
+ * @throws RelConversionException
+ */
@Test
- public void testSelectSplitOverTwoFrames()
+ public void testSelectSplitOverTwoFramesDirect() throws
RelConversionException
{
- final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
- try (final DruidStatement statement = statement(sql)) {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SELECT_FROM_FOO,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+
// First frame, ask for 2 rows.
- Meta.Frame frame =
statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET,
2);
+ statement.execute(queryPlus, -1);
+ Assert.assertEquals(0, statement.getCurrentOffset());
+ Assert.assertFalse(statement.isDone());
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
Assert.assertEquals(
- Meta.Frame.create(
- 0,
- false,
- Lists.newArrayList(
- new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "",
"a", 1.0f},
- new Object[]{
- DateTimes.of("2000-01-02").getMillis(),
- 1L,
- "10.1",
- NullHandling.defaultStringValue(),
- 2.0f
- }
- )
- ),
+ firstFrameResults(),
frame
);
Assert.assertFalse(statement.isDone());
+ Assert.assertEquals(2, statement.getCurrentOffset());
// Last frame, ask for all remaining rows.
frame = statement.nextFrame(2, 10);
Assert.assertEquals(
- Meta.Frame.create(
- 2,
- true,
- Lists.newArrayList(
- new Object[]{DateTimes.of("2000-01-03").getMillis(), 1L,
"2", "", 3.0f},
- new Object[]{DateTimes.of("2001-01-01").getMillis(), 1L,
"1", "a", 4.0f},
- new Object[]{DateTimes.of("2001-01-02").getMillis(), 1L,
"def", "abc", 5.0f},
- new Object[]{DateTimes.of("2001-01-03").getMillis(), 1L,
"abc", NullHandling.defaultStringValue(), 6.0f}
- )
- ),
+ secondFrameResults(),
+ frame
+ );
+ Assert.assertTrue(statement.isDone());
+ }
+ }
+
+ /**
+ * Verify that JDBC automatically closes the first result set when we
+ * open a second for the same statement.
+ * @throws RelConversionException
+ */
+ @Test
+ public void testTwoFramesAutoCloseDirect() throws RelConversionException
+ {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SELECT_FROM_FOO,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+ // First frame, ask for 2 rows.
+ statement.execute(queryPlus, -1);
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
+ Assert.assertEquals(
+ firstFrameResults(),
+ frame
+ );
+ Assert.assertFalse(statement.isDone());
+
+ // Do it again. Closes the prior result set.
+ statement.execute(queryPlus, -1);
+ frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
+ Assert.assertEquals(
+ firstFrameResults(),
+ frame
+ );
+ Assert.assertFalse(statement.isDone());
+
+ // Last frame, ask for all remaining rows.
+ frame = statement.nextFrame(2, 10);
+ Assert.assertEquals(
+ secondFrameResults(),
frame
);
Assert.assertTrue(statement.isDone());
}
}
- private DruidStatement statement(String sql)
+ /**
+ * Test that closing a statement with pending results automatically
+ * closes the underlying result set.
+ * @throws RelConversionException
+ */
+ @Test
+ public void testTwoFramesCloseWithResultSetDirect() throws
RelConversionException
{
- return new DruidStatement(
- "",
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SELECT_FROM_FOO,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcStatement statement = jdbcStatement()) {
+ // First frame, ask for 2 rows.
+ statement.execute(queryPlus, -1);
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2);
+ Assert.assertEquals(
+ firstFrameResults(),
+ frame
+ );
+ Assert.assertFalse(statement.isDone());
+
+ // Leave result set open; close statement.
+ }
+ }
+
+ private Meta.Frame firstFrameResults()
+ {
+ return Meta.Frame.create(
+ 0,
+ false,
+ Lists.newArrayList(
+ new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "", "a",
1.0f},
+ new Object[]{
+ DateTimes.of("2000-01-02").getMillis(),
+ 1L,
+ "10.1",
+ NullHandling.defaultStringValue(),
+ 2.0f
+ }
+ )
+ );
+ }
+
+ private Meta.Frame secondFrameResults()
+ {
+ return Meta.Frame.create(
+ 2,
+ true,
+ Lists.newArrayList(
+ new Object[]{DateTimes.of("2000-01-03").getMillis(), 1L, "2", "",
3.0f},
+ new Object[]{DateTimes.of("2001-01-01").getMillis(), 1L, "1", "a",
4.0f},
+ new Object[]{DateTimes.of("2001-01-02").getMillis(), 1L, "def",
"abc", 5.0f},
+ new Object[]{DateTimes.of("2001-01-03").getMillis(), 1L, "abc",
NullHandling.defaultStringValue(), 6.0f}
+ )
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private void verifySignature(Meta.Signature signature)
+ {
+ Assert.assertEquals(Meta.CursorFactory.ARRAY, signature.cursorFactory);
+ Assert.assertEquals(Meta.StatementType.SELECT, signature.statementType);
+ Assert.assertEquals(SELECT_STAR_FROM_FOO, signature.sql);
+ Assert.assertEquals(
+ Lists.newArrayList(
+ Lists.newArrayList("__time", "TIMESTAMP", "java.lang.Long"),
+ Lists.newArrayList("dim1", "VARCHAR", "java.lang.String"),
+ Lists.newArrayList("dim2", "VARCHAR", "java.lang.String"),
+ Lists.newArrayList("dim3", "VARCHAR", "java.lang.String"),
+ Lists.newArrayList("cnt", "BIGINT", "java.lang.Number"),
+ Lists.newArrayList("m1", "FLOAT", "java.lang.Float"),
+ Lists.newArrayList("m2", "DOUBLE", "java.lang.Double"),
+ Lists.newArrayList("unique_dim1", "OTHER", "java.lang.Object")
+ ),
+ Lists.transform(
+ signature.columns,
+ new Function<ColumnMetaData, List<String>>()
+ {
+ @Override
+ public List<String> apply(final ColumnMetaData columnMetaData)
+ {
+ return Lists.newArrayList(
+ columnMetaData.label,
+ columnMetaData.type.name,
+ columnMetaData.type.rep.clazz.getName()
+ );
+ }
+ }
+ )
+ );
+ }
+
+ //-----------------------------------------------------------------
+ // Druid JDBC Prepared Statement
+ //
+ // The JDBC PreparedStatement class starts with, then allows executing
+ // the statement sequentially, typically with a set of parameters.
+
+ private DruidJdbcPreparedStatement jdbcPreparedStatement(SqlQueryPlus
queryPlus)
+ {
+ return new DruidJdbcPreparedStatement(
+ conn,
0,
- new QueryContext(),
- sqlLifecycleFactory.factorize(),
- () -> {}
- ).prepare(sql, -1, AllowAllAuthenticator.ALLOW_ALL_RESULT);
+ queryPlus,
+ sqlLifecycleFactory,
+ Long.MAX_VALUE
+ );
+ }
+
+ @Test
+ public void testSubQueryWithOrderByPrepared()
+ {
+ final String sql = "select T20.F13 as F22 from (SELECT DISTINCT dim1 as
F13 FROM druid.foo T10) T20 order by T20.F13 ASC";
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ sql,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcPreparedStatement statement =
jdbcPreparedStatement(queryPlus)) {
+ statement.prepare();
+ // First frame, ask for all rows.
+ statement.execute(Collections.emptyList());
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ subQueryWithOrderByResults(),
+ frame
+ );
+ Assert.assertTrue(statement.isDone());
+ }
+ }
+
+ @Test
+ public void testSubQueryWithOrderByPreparedTwice()
+ {
+ final String sql = "select T20.F13 as F22 from (SELECT DISTINCT dim1 as
F13 FROM druid.foo T10) T20 order by T20.F13 ASC";
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ sql,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcPreparedStatement statement =
jdbcPreparedStatement(queryPlus)) {
+ statement.prepare();
+ statement.execute(Collections.emptyList());
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ subQueryWithOrderByResults(),
+ frame
+ );
+
+ // Do it again. JDBC says we can reuse prepared statements sequentially.
+ Assert.assertTrue(statement.isDone());
+ statement.execute(Collections.emptyList());
+ frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ subQueryWithOrderByResults(),
+ frame
+ );
+ Assert.assertTrue(statement.isDone());
+ }
+ }
+
+ @Test
+ public void testSignaturePrepared()
+ {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ SELECT_STAR_FROM_FOO,
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ try (final DruidJdbcPreparedStatement statement =
jdbcPreparedStatement(queryPlus)) {
+ statement.prepare();
+ verifySignature(statement.getSignature());
+ }
+ }
+
+ @Test
+ public void testParameters()
+ {
+ SqlQueryPlus queryPlus = new SqlQueryPlus(
+ "SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?",
+ null,
+ null,
+ AllowAllAuthenticator.ALLOW_ALL_RESULT
+ );
+ Meta.Frame expected = Meta.Frame.create(0, true,
Collections.singletonList(new Object[] {1L}));
+ List<TypedValue> matchingParams =
Collections.singletonList(TypedValue.ofLocal(ColumnMetaData.Rep.STRING,
"dummy"));
+ try (final DruidJdbcPreparedStatement statement =
jdbcPreparedStatement(queryPlus)) {
+
+ // PreparedStatement protocol: prepare once...
+ statement.prepare();
+
+ // Execute many times. First time.
+ statement.execute(matchingParams);
+ Meta.Frame frame =
statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ expected,
+ frame
+ );
+
+ // Again, same value.
+ statement.execute(matchingParams);
+ frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ expected,
+ frame
+ );
+
+ // Again, no matches.
+ statement.execute(
+ Collections.singletonList(
+ TypedValue.ofLocal(ColumnMetaData.Rep.STRING, "foo")));
+ frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6);
+ Assert.assertEquals(
+ Meta.Frame.create(0, true, Collections.emptyList()),
+ frame
+ );
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]