[ 
https://issues.apache.org/jira/browse/DRILL-4729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15412349#comment-15412349
 ] 

ASF GitHub Bot commented on DRILL-4729:
---------------------------------------

Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/530#discussion_r73942398
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
 ---
    @@ -0,0 +1,427 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.prepare;
    +
    +import static 
org.apache.drill.exec.ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS;
    +import static org.apache.drill.exec.proto.UserProtos.RequestStatus.FAILED;
    +import static org.apache.drill.exec.proto.UserProtos.RequestStatus.OK;
    +import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT;
    +
    +import org.apache.drill.common.exceptions.ErrorHelper;
    +import org.apache.drill.common.types.TypeProtos.DataMode;
    +import org.apache.drill.common.types.TypeProtos.MajorType;
    +import org.apache.drill.common.types.TypeProtos.MinorType;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
    +import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
    +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    +import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
    +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
    +import org.apache.drill.exec.proto.UserBitShared.QueryType;
    +import org.apache.drill.exec.proto.UserBitShared.SerializedField;
    +import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
    +import org.apache.drill.exec.proto.UserProtos.ColumnUpdatability;
    +import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
    +import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
    +import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
    +import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
    +import org.apache.drill.exec.proto.UserProtos.RequestStatus;
    +import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
    +import org.apache.drill.exec.proto.UserProtos.RpcType;
    +import org.apache.drill.exec.proto.UserProtos.RunQuery;
    +import org.apache.drill.exec.rpc.Acks;
    +import org.apache.drill.exec.rpc.Response;
    +import org.apache.drill.exec.rpc.ResponseSender;
    +import org.apache.drill.exec.rpc.RpcOutcomeListener;
    +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
    +import org.apache.drill.exec.rpc.user.UserSession;
    +import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
    +import org.apache.drill.exec.work.user.UserWorker;
    +import org.joda.time.Period;
    +
    +import com.google.common.collect.ImmutableMap;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.channel.ChannelFuture;
    +import java.math.BigDecimal;
    +import java.net.SocketAddress;
    +import java.sql.Date;
    +import java.sql.ResultSetMetaData;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Contains worker {@link Runnable} for creating a prepared statement and 
helper methods.
    + */
    +public class PreparedStatementProvider {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class);
    +
    +  /**
    +   * Static list of mappings from {@link MinorType} to JDBC ResultSet 
class name (to be returned through
    +   * {@link ResultSetMetaData#getColumnClassName(int)}.
    +   */
    +  private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME 
= ImmutableMap.<MinorType, String>builder()
    +      .put(MinorType.INT, Integer.class.getName())
    +      .put(MinorType.BIGINT, Long.class.getName())
    +      .put(MinorType.FLOAT4, Float.class.getName())
    +      .put(MinorType.FLOAT8, Double.class.getName())
    +      .put(MinorType.VARCHAR, String.class.getName())
    +      .put(MinorType.BIT, Boolean.class.getName())
    +      .put(MinorType.DATE, Date.class.getName())
    +      .put(MinorType.DECIMAL9, BigDecimal.class.getName())
    +      .put(MinorType.DECIMAL18, BigDecimal.class.getName())
    +      .put(MinorType.DECIMAL28SPARSE, BigDecimal.class.getName())
    +      .put(MinorType.DECIMAL38SPARSE, BigDecimal.class.getName())
    +      .put(MinorType.TIME, Time.class.getName())
    +      .put(MinorType.TIMESTAMP, Timestamp.class.getName())
    +      .put(MinorType.VARBINARY, byte[].class.getName())
    +      .put(MinorType.INTERVALYEAR, Period.class.getName())
    +      .put(MinorType.INTERVALDAY, Period.class.getName())
    +      .put(MinorType.MAP, Object.class.getName())
    +      .put(MinorType.LIST, Object.class.getName())
    +      .put(MinorType.UNION, Object.class.getName())
    +      .build();
    +
    +  /**
    +   * Runnable that creates a prepared statement for given {@link 
CreatePreparedStatementReq} and
    +   * sends the response at the end.
    +   */
    +  public static class PreparedStatementWorker implements Runnable {
    +    private final UserClientConnection connection;
    +    private final UserWorker userWorker;
    +    private final ResponseSender responseSender;
    +    private final CreatePreparedStatementReq req;
    +
    +    public PreparedStatementWorker(final UserClientConnection connection, 
final UserWorker userWorker,
    +        final ResponseSender responseSender, final 
CreatePreparedStatementReq req) {
    +      this.connection = connection;
    +      this.userWorker = userWorker;
    +      this.responseSender = responseSender;
    +      this.req = req;
    +    }
    +
    +    @Override
    +    public void run() {
    +      final CreatePreparedStatementResp.Builder respBuilder = 
CreatePreparedStatementResp.newBuilder();
    +      try {
    +        UserClientConnectionWrapper wrapper = new 
UserClientConnectionWrapper(connection);
    +
    +        final RunQuery limit0Query =
    +            RunQuery.newBuilder()
    +                .setType(QueryType.SQL)
    +                .setPlan(String.format("SELECT * FROM (%s) LIMIT 0", 
req.getSqlQuery()))
    +                .build();
    +
    +        final QueryId limit0QueryId = userWorker.submitWork(wrapper, 
limit0Query);
    +
    +        final long timeout_millis =
    +            
userWorker.getSystemOptions().getOption(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS).num_val;
    +
    +        try {
    +          if (!wrapper.await(timeout_millis)) {
    +            logger.error("LIMIT 0 query (QueryId: {}) for prepared 
statement took longer than {} ms. Cancelling.",
    +                limit0QueryId, timeout_millis);
    +            userWorker.cancelQuery(limit0QueryId);
    +            final String errorMsg = String.format(
    +                "LIMIT 0 query (QueryId: %s) for prepared statement took 
longer than %d ms. " +
    +                    "Query cancellation requested.\n" +
    +                    "Retry after changing the option '%s' to a higher 
value.",
    +                limit0QueryId, timeout_millis, 
CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS);
    +            setErrorHelper(respBuilder, TIMEOUT, null, errorMsg, 
ErrorType.SYSTEM);
    +            return;
    +          }
    +        } catch (InterruptedException ex) {
    +          setErrorHelper(respBuilder, FAILED, ex, "Prepared statement 
creation interrupted.", ErrorType.SYSTEM);
    +          return;
    +        }
    +
    +        if (wrapper.getError() != null) {
    +          setErrorHelper(respBuilder, wrapper.getError(), "Failed to get 
result set schema for prepare statement.");
    +          return;
    +        }
    +
    +        final PreparedStatement.Builder prepStmtBuilder = 
PreparedStatement.newBuilder();
    +
    +        for (SerializedField field : wrapper.getFields()) {
    +          prepStmtBuilder.addColumns(serializeColumn(field));
    +        }
    +
    +        prepStmtBuilder.setServerHandle(
    +            PreparedStatementHandle.newBuilder()
    +                .setServerInfo(
    +                    ServerPreparedStatementState.newBuilder()
    +                        .setSqlQuery(req.getSqlQuery())
    +                        .build().toByteString()
    +                )
    +        );
    +
    +        respBuilder.setStatus(OK);
    +        respBuilder.setPreparedStatement(prepStmtBuilder.build());
    +      } catch (Throwable e) {
    +        setErrorHelper(respBuilder, FAILED, e, "Failed to create prepared 
statement.", ErrorType.SYSTEM);
    +      } finally {
    +        responseSender.send(new Response(RpcType.PREPARED_STATEMENT, 
respBuilder.build()));
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Helper method to create {@link DrillPBError} and set it in 
<code>respBuilder</code>
    +   */
    +  private static void setErrorHelper(final 
CreatePreparedStatementResp.Builder respBuilder, final RequestStatus status,
    +      final Throwable ex, final String message, final ErrorType errorType) 
{
    +    respBuilder.setStatus(status);
    +    final String errorId = UUID.randomUUID().toString();
    +    if (ex != null) {
    +      logger.error("{} ErrorId: {}", message, errorId, ex);
    +    } else {
    +      logger.error("{} ErrorId: {}", message, errorId);
    +    }
    +
    +    final DrillPBError.Builder builder = DrillPBError.newBuilder();
    +    builder.setErrorType(errorType);
    +    builder.setErrorId(errorId);
    +    builder.setMessage(message);
    +
    +    if (ex != null) {
    +      builder.setException(ErrorHelper.getWrapper(ex));
    +    }
    +
    +    respBuilder.setError(builder.build());
    +  }
    +
    +  /**
    +   * Helper method to log error and set given {@link DrillPBError} in 
<code>respBuilder</code>
    +   */
    +  private static void setErrorHelper(final 
CreatePreparedStatementResp.Builder respBuilder, final DrillPBError error,
    +      final String message) {
    +    respBuilder.setStatus(FAILED);
    +    final String errorId = UUID.randomUUID().toString();
    +    logger.error("{} ErrorId: {}", message, errorId);
    +
    +    respBuilder.setError(error);
    +  }
    +
    +  /**
    +   * Decorator around {@link UserClientConnection} to tap the query 
results for LIMIT 0 query.
    +   */
    +  private static class UserClientConnectionWrapper implements 
UserClientConnection {
    +    private final UserClientConnection inner;
    +    private final CountDownLatch latch = new CountDownLatch(1);
    +
    +    private DrillPBError error;
    +    private List<SerializedField> fields;
    +
    +    UserClientConnectionWrapper(UserClientConnection inner) {
    +      this.inner = inner;
    +    }
    +
    +    @Override
    +    public UserSession getSession() {
    +      return inner.getSession();
    +    }
    +
    +    @Override
    +    public ChannelFuture getChannelClosureFuture() {
    +      return inner.getChannelClosureFuture();
    +    }
    +
    +    @Override
    +    public SocketAddress getRemoteAddress() {
    +      return inner.getRemoteAddress();
    +    }
    +
    +    @Override
    +    public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult 
result) {
    +      // Release the wait latch if the query is terminated.
    +      final QueryState state = result.getQueryState();
    +      if (state == QueryState.FAILED || state  == QueryState.CANCELED || 
state == QueryState.COMPLETED) {
    +        if (state == QueryState.FAILED) {
    +          error = result.getError(0);
    +        }
    +        latch.countDown();
    +      }
    +
    +      listener.success(Acks.OK, null);
    +    }
    +
    +    @Override
    +    public void sendData(RpcOutcomeListener<Ack> listener, 
QueryWritableBatch result) {
    +      // Save the query results schema and release the buffers.
    +      if (fields == null) {
    +        fields = result.getHeader().getDef().getFieldList();
    +      }
    +
    +      for(ByteBuf buf : result.getBuffers()) {
    +        buf.release();
    +      }
    +
    +      listener.success(Acks.OK, null);
    +    }
    +
    +    /**
    +     * Wait until the query has completed.
    +     * @throws InterruptedException
    +     */
    +    boolean await(final long timeoutMillis) throws InterruptedException {
    +      return latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    +    }
    +
    +    /**
    +     * @return Any error returned in query execution.
    +     */
    +    DrillPBError getError() {
    +      return error;
    +    }
    +
    +    /**
    +     * @return Schema returned in query result batch.
    +     */
    +    List<SerializedField> getFields() {
    +      return fields;
    +    }
    +  }
    +
    +  /**
    +   * Serialize the given {@link SerializedField} into a {@link 
ResultColumnMetadata}.
    +   * @param field
    +   * @return
    +   */
    +  private static ResultColumnMetadata serializeColumn(SerializedField 
field) {
    +    final ResultColumnMetadata.Builder builder = 
ResultColumnMetadata.newBuilder();
    +    final MajorType majorType = field.getMajorType();
    +    final MinorType minorType = majorType.getMinorType();
    +
    +    /**
    +     * Defaults to "DRILL" as drill has as only one catalog.
    +     */
    +    builder.setCatalogName(InfoSchemaConstants.IS_CATALOG_NAME);
    +
    +    /**
    +     * Designated column's schema name. Empty string if not applicable. 
Initial implementation defaults to empty string
    +     * as we use LIMIT 0 queries to get the schema and schema info is 
lost. If we derive the schema from plan, we may
    +     * get the right value.
    +     */
    +    builder.setSchemaName("");
    +
    +    /**
    +     * Designated column's table name. Not set if not applicable. Initial 
implementation defaults to empty string as
    +     * we use LIMIT 0 queries to get the schema and table info is lost. If 
we derive the table from plan, we may get
    +     * the right value.
    +     */
    +    builder.setTableName("");
    +
    +    builder.setColumnName(field.getNamePart().getName());
    +
    +    /**
    +     * Column label name for display or print purposes.
    +     * Ex. a column named "empName" might be labeled as "Employee Name".
    +     * Initial implementation defaults to same value as column name.
    +     */
    +    builder.setLabel(field.getNamePart().getName());
    +
    +    /**
    +     * Data type in string format. Value is SQL standard type.
    +     */
    +    builder.setDataType(Types.getSqlTypeName(majorType));
    +
    +    builder.setIsNullable(majorType.getMode() == DataMode.OPTIONAL);
    +
    +    /**
    +     * For numeric data, this is the maximum precision.
    +     * For character data, this is the length in characters.
    +     * For datetime datatypes, this is the length in characters of the 
String representation
    +     *    (assuming the maximum allowed precision of the fractional 
seconds component).
    +     * For binary data, this is the length in bytes.
    +     * For all other types 0 is returned where the column size is not 
applicable.
    +     */
    +    builder.setPrecision(getPrecision(field.getMajorType()));
    +
    +    /**
    +     * Column's number of digits to right of the decimal point. 0 is 
returned for types where the scale is not applicable
    +     */
    +    builder.setScale(getScale(majorType));
    +
    +    /**
    +     * Indicates whether values in the designated column are signed 
numbers.
    +     */
    +    builder.setSigned(Types.isNumericType(majorType));
    +
    +    /**
    +     * Maximum number of characters required to display data from the 
column. Initial implementation hard coded to 10.
    +     */
    +    builder.setDisplaySize(10);
    +
    +    /**
    +     * Is the column an aliased column. Initial implementation defaults to 
true as we derive schema from LIMIT 0 query and
    +     * not plan
    +     */
    +    builder.setIsAliased(true);
    +
    +    builder.setSearchability(ColumnSearchability.ALL);
    +    builder.setUpdatability(ColumnUpdatability.READ_ONLY);
    +    builder.setAutoIncrement(false);
    +    builder.setCaseSensitivity(false);
    +    builder.setSortable(isSortable(minorType));
    +
    +    /**
    +     * Returns the fully-qualified name of the Java class whose instances 
are manufactured if the method
    +     * ResultSet.getObject is called to retrieve a value from the column. 
Applicable only to JDBC clients.
    +     */
    +    builder.setClassName(DRILL_TYPE_TO_JDBC_CLASSNAME.get(minorType));
    +
    +    builder.setIsCurrency(false);
    +
    +    return builder.build();
    +  }
    +
    +  private static int getPrecision(MajorType majorType) {
    --- End diff --
    
    Move these helpers to `Types` class?


> Add support for prepared statement implementation on server side
> ----------------------------------------------------------------
>
>                 Key: DRILL-4729
>                 URL: https://issues.apache.org/jira/browse/DRILL-4729
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Metadata
>            Reporter: Venki Korukanti
>            Assignee: Venki Korukanti
>             Fix For: 1.8.0
>
>
> Currently Drill JDBC/ODBC driver implements its own prepared statement 
> implementation, which basically issues limit 0 query to get the metadata and 
> then executes the actual query. So the query is planned twice (for metadata 
> fetch and actual execution). Proposal is to move that logic to server where 
> we can make optimizations without disrupting/updating the JDBC/ODBC drivers.
> *  {{PreparedStatement createPreparedStatement(String query)}}. 
> {{PreparedStatement}} object contains the following:
> ** {{ResultSetMetadata getResultSetMetadata()}}
> *** {{ResultsSetMetadata}} contains methods to fetch info about output 
> columns of the query. What info these methods provide is given in this 
> [spreadsheet|https://docs.google.com/spreadsheets/d/1A6nqUQo5xJaZDQlDTittpVrK7t4Kylycs3P32Yn_O5k/edit?usp=sharing].
>  It lists the ODBC/JDBC requirements and what Drill will provided through 
> object {{ResultsSetMetadata}}.
> *** Server can put more info here which is opaque to client and use it in 
> server when the client sends execute prepared statement query request. 
> Overload the current submit query API to take the {{PreparedStatement}} 
> returned above. 
> In the initial implementation, server side implementation of 
> {{createPreparedStatement}} API is implemented as follows:
> * Runs the query with {{LIMIT 0}}, gets the schema
> * Convert the query into a binary blob and set it as opaque object in 
> {{PreparedStatement}}.
> When the {{PreparedStatement}} is submitted for execution, reconstruct the 
> query from binary blob in opaque component of {{PreparedStatement}} and 
> execute it from scratch. 
> Opaque component of the {{PreparedStatement}} is where we can save more 
> information which we can use for optimizations/speedups.
> NOTE: We are not going to worry about parameters in prepared query in initial 
> implementation. We can provide the functionality later if there is sufficient 
> demand from Drill community.
> Changes in this patch are going to include protobuf messages, server side 
> messages and Java client APIs. Native client changes are going to be tracked 
> in a separate JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to