[
https://issues.apache.org/jira/browse/DRILL-4729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15412349#comment-15412349
]
ASF GitHub Bot commented on DRILL-4729:
---------------------------------------
Github user sudheeshkatkam commented on a diff in the pull request:
https://github.com/apache/drill/pull/530#discussion_r73942398
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
---
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.prepare;
+
+import static
org.apache.drill.exec.ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.FAILED;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.OK;
+import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT;
+
+import org.apache.drill.common.exceptions.ErrorHelper;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
+import org.apache.drill.exec.proto.UserProtos.ColumnUpdatability;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
+import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
+import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
+import org.apache.drill.exec.proto.UserProtos.RequestStatus;
+import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.exec.work.user.UserWorker;
+import org.joda.time.Period;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import java.math.BigDecimal;
+import java.net.SocketAddress;
+import java.sql.Date;
+import java.sql.ResultSetMetaData;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Contains worker {@link Runnable} for creating a prepared statement and
helper methods.
+ */
+public class PreparedStatementProvider {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class);
+
+ /**
+ * Static list of mappings from {@link MinorType} to JDBC ResultSet
class name (to be returned through
+ * {@link ResultSetMetaData#getColumnClassName(int)}.
+ */
+ private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME
= ImmutableMap.<MinorType, String>builder()
+ .put(MinorType.INT, Integer.class.getName())
+ .put(MinorType.BIGINT, Long.class.getName())
+ .put(MinorType.FLOAT4, Float.class.getName())
+ .put(MinorType.FLOAT8, Double.class.getName())
+ .put(MinorType.VARCHAR, String.class.getName())
+ .put(MinorType.BIT, Boolean.class.getName())
+ .put(MinorType.DATE, Date.class.getName())
+ .put(MinorType.DECIMAL9, BigDecimal.class.getName())
+ .put(MinorType.DECIMAL18, BigDecimal.class.getName())
+ .put(MinorType.DECIMAL28SPARSE, BigDecimal.class.getName())
+ .put(MinorType.DECIMAL38SPARSE, BigDecimal.class.getName())
+ .put(MinorType.TIME, Time.class.getName())
+ .put(MinorType.TIMESTAMP, Timestamp.class.getName())
+ .put(MinorType.VARBINARY, byte[].class.getName())
+ .put(MinorType.INTERVALYEAR, Period.class.getName())
+ .put(MinorType.INTERVALDAY, Period.class.getName())
+ .put(MinorType.MAP, Object.class.getName())
+ .put(MinorType.LIST, Object.class.getName())
+ .put(MinorType.UNION, Object.class.getName())
+ .build();
+
+ /**
+ * Runnable that creates a prepared statement for given {@link
CreatePreparedStatementReq} and
+ * sends the response at the end.
+ */
+ public static class PreparedStatementWorker implements Runnable {
+ private final UserClientConnection connection;
+ private final UserWorker userWorker;
+ private final ResponseSender responseSender;
+ private final CreatePreparedStatementReq req;
+
+ public PreparedStatementWorker(final UserClientConnection connection,
final UserWorker userWorker,
+ final ResponseSender responseSender, final
CreatePreparedStatementReq req) {
+ this.connection = connection;
+ this.userWorker = userWorker;
+ this.responseSender = responseSender;
+ this.req = req;
+ }
+
+ @Override
+ public void run() {
+ final CreatePreparedStatementResp.Builder respBuilder =
CreatePreparedStatementResp.newBuilder();
+ try {
+ UserClientConnectionWrapper wrapper = new
UserClientConnectionWrapper(connection);
+
+ final RunQuery limit0Query =
+ RunQuery.newBuilder()
+ .setType(QueryType.SQL)
+ .setPlan(String.format("SELECT * FROM (%s) LIMIT 0",
req.getSqlQuery()))
+ .build();
+
+ final QueryId limit0QueryId = userWorker.submitWork(wrapper,
limit0Query);
+
+ final long timeout_millis =
+
userWorker.getSystemOptions().getOption(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS).num_val;
+
+ try {
+ if (!wrapper.await(timeout_millis)) {
+ logger.error("LIMIT 0 query (QueryId: {}) for prepared
statement took longer than {} ms. Cancelling.",
+ limit0QueryId, timeout_millis);
+ userWorker.cancelQuery(limit0QueryId);
+ final String errorMsg = String.format(
+ "LIMIT 0 query (QueryId: %s) for prepared statement took
longer than %d ms. " +
+ "Query cancellation requested.\n" +
+ "Retry after changing the option '%s' to a higher
value.",
+ limit0QueryId, timeout_millis,
CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS);
+ setErrorHelper(respBuilder, TIMEOUT, null, errorMsg,
ErrorType.SYSTEM);
+ return;
+ }
+ } catch (InterruptedException ex) {
+ setErrorHelper(respBuilder, FAILED, ex, "Prepared statement
creation interrupted.", ErrorType.SYSTEM);
+ return;
+ }
+
+ if (wrapper.getError() != null) {
+ setErrorHelper(respBuilder, wrapper.getError(), "Failed to get
result set schema for prepare statement.");
+ return;
+ }
+
+ final PreparedStatement.Builder prepStmtBuilder =
PreparedStatement.newBuilder();
+
+ for (SerializedField field : wrapper.getFields()) {
+ prepStmtBuilder.addColumns(serializeColumn(field));
+ }
+
+ prepStmtBuilder.setServerHandle(
+ PreparedStatementHandle.newBuilder()
+ .setServerInfo(
+ ServerPreparedStatementState.newBuilder()
+ .setSqlQuery(req.getSqlQuery())
+ .build().toByteString()
+ )
+ );
+
+ respBuilder.setStatus(OK);
+ respBuilder.setPreparedStatement(prepStmtBuilder.build());
+ } catch (Throwable e) {
+ setErrorHelper(respBuilder, FAILED, e, "Failed to create prepared
statement.", ErrorType.SYSTEM);
+ } finally {
+ responseSender.send(new Response(RpcType.PREPARED_STATEMENT,
respBuilder.build()));
+ }
+ }
+ }
+
+ /**
+ * Helper method to create {@link DrillPBError} and set it in
<code>respBuilder</code>
+ */
+ private static void setErrorHelper(final
CreatePreparedStatementResp.Builder respBuilder, final RequestStatus status,
+ final Throwable ex, final String message, final ErrorType errorType)
{
+ respBuilder.setStatus(status);
+ final String errorId = UUID.randomUUID().toString();
+ if (ex != null) {
+ logger.error("{} ErrorId: {}", message, errorId, ex);
+ } else {
+ logger.error("{} ErrorId: {}", message, errorId);
+ }
+
+ final DrillPBError.Builder builder = DrillPBError.newBuilder();
+ builder.setErrorType(errorType);
+ builder.setErrorId(errorId);
+ builder.setMessage(message);
+
+ if (ex != null) {
+ builder.setException(ErrorHelper.getWrapper(ex));
+ }
+
+ respBuilder.setError(builder.build());
+ }
+
+ /**
+ * Helper method to log error and set given {@link DrillPBError} in
<code>respBuilder</code>
+ */
+ private static void setErrorHelper(final
CreatePreparedStatementResp.Builder respBuilder, final DrillPBError error,
+ final String message) {
+ respBuilder.setStatus(FAILED);
+ final String errorId = UUID.randomUUID().toString();
+ logger.error("{} ErrorId: {}", message, errorId);
+
+ respBuilder.setError(error);
+ }
+
+ /**
+ * Decorator around {@link UserClientConnection} to tap the query
results for LIMIT 0 query.
+ */
+ private static class UserClientConnectionWrapper implements
UserClientConnection {
+ private final UserClientConnection inner;
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private DrillPBError error;
+ private List<SerializedField> fields;
+
+ UserClientConnectionWrapper(UserClientConnection inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public UserSession getSession() {
+ return inner.getSession();
+ }
+
+ @Override
+ public ChannelFuture getChannelClosureFuture() {
+ return inner.getChannelClosureFuture();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return inner.getRemoteAddress();
+ }
+
+ @Override
+ public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult
result) {
+ // Release the wait latch if the query is terminated.
+ final QueryState state = result.getQueryState();
+ if (state == QueryState.FAILED || state == QueryState.CANCELED ||
state == QueryState.COMPLETED) {
+ if (state == QueryState.FAILED) {
+ error = result.getError(0);
+ }
+ latch.countDown();
+ }
+
+ listener.success(Acks.OK, null);
+ }
+
+ @Override
+ public void sendData(RpcOutcomeListener<Ack> listener,
QueryWritableBatch result) {
+ // Save the query results schema and release the buffers.
+ if (fields == null) {
+ fields = result.getHeader().getDef().getFieldList();
+ }
+
+ for(ByteBuf buf : result.getBuffers()) {
+ buf.release();
+ }
+
+ listener.success(Acks.OK, null);
+ }
+
+ /**
+ * Wait until the query has completed.
+ * @throws InterruptedException
+ */
+ boolean await(final long timeoutMillis) throws InterruptedException {
+ return latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return Any error returned in query execution.
+ */
+ DrillPBError getError() {
+ return error;
+ }
+
+ /**
+ * @return Schema returned in query result batch.
+ */
+ List<SerializedField> getFields() {
+ return fields;
+ }
+ }
+
+ /**
+ * Serialize the given {@link SerializedField} into a {@link
ResultColumnMetadata}.
+ * @param field
+ * @return
+ */
+ private static ResultColumnMetadata serializeColumn(SerializedField
field) {
+ final ResultColumnMetadata.Builder builder =
ResultColumnMetadata.newBuilder();
+ final MajorType majorType = field.getMajorType();
+ final MinorType minorType = majorType.getMinorType();
+
+ /**
+ * Defaults to "DRILL" as drill has as only one catalog.
+ */
+ builder.setCatalogName(InfoSchemaConstants.IS_CATALOG_NAME);
+
+ /**
+ * Designated column's schema name. Empty string if not applicable.
Initial implementation defaults to empty string
+ * as we use LIMIT 0 queries to get the schema and schema info is
lost. If we derive the schema from plan, we may
+ * get the right value.
+ */
+ builder.setSchemaName("");
+
+ /**
+ * Designated column's table name. Not set if not applicable. Initial
implementation defaults to empty string as
+ * we use LIMIT 0 queries to get the schema and table info is lost. If
we derive the table from plan, we may get
+ * the right value.
+ */
+ builder.setTableName("");
+
+ builder.setColumnName(field.getNamePart().getName());
+
+ /**
+ * Column label name for display or print purposes.
+ * Ex. a column named "empName" might be labeled as "Employee Name".
+ * Initial implementation defaults to same value as column name.
+ */
+ builder.setLabel(field.getNamePart().getName());
+
+ /**
+ * Data type in string format. Value is SQL standard type.
+ */
+ builder.setDataType(Types.getSqlTypeName(majorType));
+
+ builder.setIsNullable(majorType.getMode() == DataMode.OPTIONAL);
+
+ /**
+ * For numeric data, this is the maximum precision.
+ * For character data, this is the length in characters.
+ * For datetime datatypes, this is the length in characters of the
String representation
+ * (assuming the maximum allowed precision of the fractional
seconds component).
+ * For binary data, this is the length in bytes.
+ * For all other types 0 is returned where the column size is not
applicable.
+ */
+ builder.setPrecision(getPrecision(field.getMajorType()));
+
+ /**
+ * Column's number of digits to right of the decimal point. 0 is
returned for types where the scale is not applicable
+ */
+ builder.setScale(getScale(majorType));
+
+ /**
+ * Indicates whether values in the designated column are signed
numbers.
+ */
+ builder.setSigned(Types.isNumericType(majorType));
+
+ /**
+ * Maximum number of characters required to display data from the
column. Initial implementation hard coded to 10.
+ */
+ builder.setDisplaySize(10);
+
+ /**
+ * Is the column an aliased column. Initial implementation defaults to
true as we derive schema from LIMIT 0 query and
+ * not plan
+ */
+ builder.setIsAliased(true);
+
+ builder.setSearchability(ColumnSearchability.ALL);
+ builder.setUpdatability(ColumnUpdatability.READ_ONLY);
+ builder.setAutoIncrement(false);
+ builder.setCaseSensitivity(false);
+ builder.setSortable(isSortable(minorType));
+
+ /**
+ * Returns the fully-qualified name of the Java class whose instances
are manufactured if the method
+ * ResultSet.getObject is called to retrieve a value from the column.
Applicable only to JDBC clients.
+ */
+ builder.setClassName(DRILL_TYPE_TO_JDBC_CLASSNAME.get(minorType));
+
+ builder.setIsCurrency(false);
+
+ return builder.build();
+ }
+
+ private static int getPrecision(MajorType majorType) {
--- End diff --
Move these helpers to `Types` class?
> Add support for prepared statement implementation on server side
> ----------------------------------------------------------------
>
> Key: DRILL-4729
> URL: https://issues.apache.org/jira/browse/DRILL-4729
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Metadata
> Reporter: Venki Korukanti
> Assignee: Venki Korukanti
> Fix For: 1.8.0
>
>
> Currently Drill JDBC/ODBC driver implements its own prepared statement
> implementation, which basically issues limit 0 query to get the metadata and
> then executes the actual query. So the query is planned twice (for metadata
> fetch and actual execution). Proposal is to move that logic to server where
> we can make optimizations without disrupting/updating the JDBC/ODBC drivers.
> * {{PreparedStatement createPreparedStatement(String query)}}.
> {{PreparedStatement}} object contains the following:
> ** {{ResultSetMetadata getResultSetMetadata()}}
> *** {{ResultsSetMetadata}} contains methods to fetch info about output
> columns of the query. What info these methods provide is given in this
> [spreadsheet|https://docs.google.com/spreadsheets/d/1A6nqUQo5xJaZDQlDTittpVrK7t4Kylycs3P32Yn_O5k/edit?usp=sharing].
> It lists the ODBC/JDBC requirements and what Drill will provided through
> object {{ResultsSetMetadata}}.
> *** Server can put more info here which is opaque to client and use it in
> server when the client sends execute prepared statement query request.
> Overload the current submit query API to take the {{PreparedStatement}}
> returned above.
> In the initial implementation, server side implementation of
> {{createPreparedStatement}} API is implemented as follows:
> * Runs the query with {{LIMIT 0}}, gets the schema
> * Convert the query into a binary blob and set it as opaque object in
> {{PreparedStatement}}.
> When the {{PreparedStatement}} is submitted for execution, reconstruct the
> query from binary blob in opaque component of {{PreparedStatement}} and
> execute it from scratch.
> Opaque component of the {{PreparedStatement}} is where we can save more
> information which we can use for optimizations/speedups.
> NOTE: We are not going to worry about parameters in prepared query in initial
> implementation. We can provide the functionality later if there is sufficient
> demand from Drill community.
> Changes in this patch are going to include protobuf messages, server side
> messages and Java client APIs. Native client changes are going to be tracked
> in a separate JIRA.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)