This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 75f652ab7d66cbed82495dccf7409ba4cccdedf1 Author: Paul Rogers <[email protected]> AuthorDate: Mon May 4 14:17:18 2020 -0700 DRILL-7730: Improve web query efficiency Implements a direct transfer of batches from Screen to web client. Cleans up web client query processing to avoid duplicate schema info. Much related code cleanup. --- .../org/apache/drill/exec/client/DrillClient.java | 83 ++++----- .../java/org/apache/drill/exec/client/DumpCat.java | 4 +- .../client/InvalidConnectionInfoException.java | 1 + .../drill/exec/client/LoggingResultsListener.java | 4 +- .../apache/drill/exec/client/QuerySubmitter.java | 2 - .../drill/exec/ops/AccountingUserConnection.java | 9 +- .../drill/exec/physical/impl/ScreenCreator.java | 55 +++--- .../drill/exec/physical/impl/filter/Filterer.java | 9 +- .../impl/materialize/QueryDataPackage.java | 124 +++++++++++++ .../impl/materialize/QueryWritableBatch.java | 1 - .../impl/materialize/RecordMaterializer.java | 5 + .../impl/materialize/VectorRecordMaterializer.java | 24 ++- .../exec/record/AbstractSingleRecordBatch.java | 15 +- .../org/apache/drill/exec/record/BatchSchema.java | 1 - .../drill/exec/record/RecordBatchLoader.java | 27 +-- .../apache/drill/exec/record/WritableBatch.java | 10 +- .../AbstractDisposableUserClientConnection.java | 16 +- .../drill/exec/rpc/UserClientConnection.java | 23 +-- .../rpc/user/AwaitableUserResultsListener.java | 1 - .../drill/exec/rpc/user/QueryResultHandler.java | 9 +- .../drill/exec/rpc/user/UserResultsListener.java | 22 ++- .../org/apache/drill/exec/rpc/user/UserServer.java | 42 ++--- .../apache/drill/exec/rpc/user/UserSession.java | 6 +- .../exec/server/rest/BaseWebUserConnection.java | 57 ++++++ .../drill/exec/server/rest/DrillRestServer.java | 26 ++- .../drill/exec/server/rest/LogsResources.java | 14 +- .../drill/exec/server/rest/QueryWrapper.java | 3 +- .../drill/exec/server/rest/RestQueryRunner.java | 25 +-- .../apache/drill/exec/server/rest/WebServer.java | 12 +- .../exec/server/rest/WebSessionResources.java | 13 +- .../drill/exec/server/rest/WebUserConnection.java | 195 ++++++++------------- .../org/apache/drill/exec/work/WorkManager.java | 8 +- .../work/prepare/PreparedStatementProvider.java | 55 +++--- .../apache/drill/exec/work/user/UserWorker.java | 4 +- .../src/main/java/io/netty/buffer/DrillBuf.java | 66 ++++--- .../drill/exec/memory/AllocationManager.java | 24 +-- .../drill/exec/rpc/BaseRpcOutcomeListener.java | 10 +- .../org/apache/drill/exec/rpc/RequestIdMap.java | 17 +- .../apache/drill/exec/rpc/RpcOutcomeListener.java | 6 +- 39 files changed, 585 insertions(+), 443 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 237aba1..fdbb971 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -93,6 +93,8 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.drill.shaded.guava.com.google.common.base.Strings; import org.apache.drill.shaded.guava.com.google.common.util.concurrent.SettableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.netty.channel.EventLoopGroup; @@ -101,16 +103,15 @@ import io.netty.channel.EventLoopGroup; * String into ByteBuf. */ public class DrillClient implements Closeable, ConnectionThrottle { + private static Logger logger = LoggerFactory.getLogger(DrillClient.class); public static final String DEFAULT_CLIENT_NAME = "Apache Drill Java client"; - - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class); - private static final ObjectMapper objectMapper = new ObjectMapper(); + private final DrillConfig config; private UserClient client; private DrillProperties properties; private volatile ClusterCoordinator clusterCoordinator; - private volatile boolean connected = false; + private volatile boolean connected; private final BufferAllocator allocator; private final int reconnectTimes; private final int reconnectDelay; @@ -199,10 +200,12 @@ public class DrillClient implements Closeable, ConnectionThrottle { } /** - * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set. - * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type. + * Sets whether the application is willing to accept complex types (Map, + * Arrays) in the returned result set. Default is {@code true}. If set to + * {@code false}, the complex types are returned as JSON encoded VARCHAR type. * - * @throws IllegalStateException if called after a connection has been established. + * @throws IllegalStateException + * if called after a connection has been established. */ public void setSupportComplexTypes(boolean supportComplexTypes) { if (connected) { @@ -468,10 +471,10 @@ public class DrillClient implements Closeable, ConnectionThrottle { */ @Override public void close() { - if (this.client != null) { - this.client.close(); + if (client != null) { + client.close(); } - if (this.ownsAllocator && allocator != null) { + if (ownsAllocator && allocator != null) { DrillAutoCloseables.closeNoChecked(allocator); } if (ownsZkConnection) { @@ -486,19 +489,17 @@ public class DrillClient implements Closeable, ConnectionThrottle { } if (eventLoopGroup != null) { eventLoopGroup.shutdownGracefully(); + eventLoopGroup = null; } if (executor != null) { executor.shutdownNow(); + executor = null; } - // TODO: Did DRILL-1735 changes cover this TODO?: - // TODO: fix tests that fail when this is called. - //allocator.close(); connected = false; } - /** * Return the server infos. Only available after connecting * @@ -585,11 +586,15 @@ public class DrillClient implements Closeable, ConnectionThrottle { /** * API to just plan a query without execution + * * @param type * @param query - * @param isSplitPlan - option to tell whether to return single or split plans for a query - * @return list of PlanFragments that can be used later on in {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType, java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)} - * to run a query without additional planning + * @param isSplitPlan + * option to tell whether to return single or split plans for a query + * @return list of PlanFragments that can be used later on in + * {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType, + * java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)} + * to run a query without additional planning */ public DrillRpcFuture<QueryPlanFragments> planQuery(QueryType type, String query, boolean isSplitPlan) { GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder().setQuery(query).setType(type).setSplitPlan(isSplitPlan).build(); @@ -632,7 +637,7 @@ public class DrillClient implements Closeable, ConnectionThrottle { client.submitQuery(resultsListener, query); } - /* + /** * Helper method to generate the UserCredentials message from the properties. */ private UserBitShared.UserCredentials getUserCredentials() { @@ -660,10 +665,10 @@ public class DrillClient implements Closeable, ConnectionThrottle { } /** - * Get the list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters. + * Get the list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters. * - * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter. - * @return The list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters. + * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. + * @return The list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters. */ public DrillRpcFuture<GetCatalogsResp> getCatalogs(LikeFilter catalogNameFilter) { final GetCatalogsReq.Builder reqBuilder = GetCatalogsReq.newBuilder(); @@ -675,11 +680,11 @@ public class DrillClient implements Closeable, ConnectionThrottle { } /** - * Get the list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters. + * Get the list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters. * - * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter. - * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter. - * @return The list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters. + * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. + * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter. + * @return The list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters. */ public DrillRpcFuture<GetSchemasResp> getSchemas(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter) { final GetSchemasReq.Builder reqBuilder = GetSchemasReq.newBuilder(); @@ -695,13 +700,13 @@ public class DrillClient implements Closeable, ConnectionThrottle { } /** - * Get the list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters. + * Get the list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters. * - * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter. - * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter. - * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter. - * @param tableTypeFilter Filter in <code>table type</code>. Pass null to apply no filter - * @return The list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters. + * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. + * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter. + * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter. + * @param tableTypeFilter Filter in {@code table type}. Pass null to apply no filter + * @return The list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters. */ public DrillRpcFuture<GetTablesResp> getTables(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter, LikeFilter tableNameFilter, List<String> tableTypeFilter) { @@ -726,13 +731,13 @@ public class DrillClient implements Closeable, ConnectionThrottle { } /** - * Get the list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters. + * Get the list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters. * - * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter. - * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter. - * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter. - * @param columnNameFilter Filter in <code>column name</code>. Pass null to apply no filter. - * @return The list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters. + * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter. + * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter. + * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter. + * @param columnNameFilter Filter in {@code column name}. Pass null to apply no filter. + * @return The list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters. */ public DrillRpcFuture<GetColumnsResp> getColumns(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter, LikeFilter tableNameFilter, LikeFilter columnNameFilter) { @@ -757,10 +762,10 @@ public class DrillClient implements Closeable, ConnectionThrottle { } /** - * Create a prepared statement for given the <code>query</code>. + * Create a prepared statement for given the {@code query}. * * @param query - * @return The prepared statement for given the <code>query</code>. + * @return The prepared statement for given the {@code query}. */ public DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query) { final CreatePreparedStatementReq req = diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java index 47a364a..c751897 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java @@ -250,7 +250,7 @@ public class DumpCat { System.out.println(getBatchMetaInfo(vcSerializable).toString()); System.out.println("Schema Information"); - for (final VectorWrapper w : vectorContainer) { + for (final VectorWrapper<?> w : vectorContainer) { final MaterializedField field = w.getValueVector().getField(); System.out.println (String.format("name : %s, minor_type : %s, data_mode : %s", field.getName(), @@ -279,7 +279,7 @@ public class DumpCat { selectedRows = vcSerializable.getSv2().getCount(); } - for (final VectorWrapper w : vectorContainer) { + for (final VectorWrapper<?> w : vectorContainer) { totalDataSize += w.getValueVector().getBufferSize(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java index 19e72ff..e90cfae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.rpc.NonTransientRpcException; /** * Exception for malformed connection string from client */ +@SuppressWarnings("serial") public class InvalidConnectionInfoException extends NonTransientRpcException { public InvalidConnectionInfoException(String message) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java index 25e472f..25331da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java @@ -36,11 +36,13 @@ import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.netty.buffer.DrillBuf; public class LoggingResultsListener implements UserResultsListener { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoggingResultsListener.class); + private static Logger logger = LoggerFactory.getLogger(LoggingResultsListener.class); private final AtomicInteger count = new AtomicInteger(); private final Stopwatch w = Stopwatch.createUnstarted(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java index d60f6ae..5a5f725 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java @@ -201,7 +201,5 @@ public class QuerySubmitter { watch.reset(); } return 0; - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java index c0c1e0b..7510cf4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.ops; -import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; +import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.UserClientConnection; @@ -31,14 +31,15 @@ public class AccountingUserConnection { private final SendingAccountor sendingAccountor; private final RpcOutcomeListener<Ack> statusHandler; - public AccountingUserConnection(UserClientConnection connection, SendingAccountor sendingAccountor, RpcOutcomeListener<Ack> statusHandler) { + public AccountingUserConnection(UserClientConnection connection, SendingAccountor sendingAccountor, + RpcOutcomeListener<Ack> statusHandler) { this.connection = connection; this.sendingAccountor = sendingAccountor; this.statusHandler = statusHandler; } - public void sendData(QueryWritableBatch batch) { + public void sendData(QueryDataPackage data) { sendingAccountor.increment(); - connection.sendData(statusHandler, batch); + connection.sendData(statusHandler, data); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 0f7ca13..3f71cbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -19,17 +19,14 @@ package org.apache.drill.exec.physical.impl; import java.util.List; -import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingUserConnection; import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.config.Screen; -import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; -import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer; +import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage.DataPackage; +import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage.EmptyResultsPackage; import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer; -import org.apache.drill.exec.proto.UserBitShared.QueryData; -import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.testing.ControlsInjector; @@ -49,13 +46,18 @@ public class ScreenCreator implements RootCreator<Screen> { return new ScreenRoot(context, children.iterator().next(), config); } + /** + * Transfer batches to a user connection. The user connection is typically a + * network connection, but may be internal for a web or REST client. Data is + * sent as a "package", allowing the network client to request serialization, + * and the internal client to just transfer buffer ownership. + */ public static class ScreenRoot extends BaseRootExec { private static final Logger logger = LoggerFactory.getLogger(ScreenRoot.class); private final RecordBatch incoming; private final RootFragmentContext context; private final AccountingUserConnection userConnection; - private RecordMaterializer materializer; - + private DataPackage dataPackage; private boolean firstBatch = true; public enum Metric implements MetricDef { @@ -67,15 +69,11 @@ public class ScreenCreator implements RootCreator<Screen> { } } - public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { + public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) { super(context, config); this.context = context; this.incoming = incoming; - userConnection = context.getUserDataTunnel(); - } - - public RootFragmentContext getContext() { - return context; + this.userConnection = context.getUserDataTunnel(); } @Override @@ -85,53 +83,40 @@ public class ScreenCreator implements RootCreator<Screen> { switch (outcome) { case NONE: if (firstBatch) { - // this is the only data message sent to the client and may contain the schema - QueryWritableBatch batch; - QueryData header = QueryData.newBuilder() - .setQueryId(context.getHandle().getQueryId()) - .setRowCount(0) - .setDef(RecordBatchDef.getDefaultInstance()) - .build(); - batch = new QueryWritableBatch(header); stats.startWait(); try { - userConnection.sendData(batch); + // This is the only data message sent to the client and does not contain the schema + userConnection.sendData(new EmptyResultsPackage(context.getHandle().getQueryId())); } finally { stats.stopWait(); } firstBatch = false; // we don't really need to set this. But who knows! } - return false; + case OK_NEW_SCHEMA: - materializer = new VectorRecordMaterializer(context, oContext, incoming); + dataPackage = new DataPackage(new VectorRecordMaterializer(context, oContext, incoming), stats); //$FALL-THROUGH$ case OK: injector.injectPause(context.getExecutionControls(), "sending-data", logger); - final QueryWritableBatch batch = materializer.convertNext(); - updateStats(batch); stats.startWait(); try { - userConnection.sendData(batch); + // Stats updated if connection serializes the batch + userConnection.sendData(dataPackage); } finally { stats.stopWait(); } firstBatch = false; - return true; + default: throw new UnsupportedOperationException(outcome.name()); } } - public void updateStats(QueryWritableBatch queryBatch) { - stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount()); - } - - RecordBatch getIncoming() { - return incoming; - } + public RootFragmentContext getContext() { return context; } + protected RecordBatch getIncoming() { return incoming; } @Override public void close() throws Exception { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java index a3d03c2..ec22632 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java @@ -24,9 +24,12 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; public interface Filterer { - TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); - TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); + TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = + new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); + TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = + new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); - void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; + void setup(FragmentContext context, RecordBatch incoming, + RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; void filterBatch(int recordCount) throws SchemaChangeException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java new file mode 100644 index 0000000..f9a933d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.materialize; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot.Metric; +import org.apache.drill.exec.proto.UserBitShared.QueryData; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; + +/** + * Packages a batch from the Screen operator to send to its + * user connection. In the original Drill, that connection was always a + * network connection, and so the outgoing batch is serialized to a set + * of buffers ready to send. However, the REST server runs in the same process. + * The original REST query implementation serialized the data to buffers, then + * copied the data to a big buffer to be deserialized, causing significant memory + * pressure. This version allows the user connection to elect for serialization, + * or just to access the original source batch. + */ +public interface QueryDataPackage { + + QueryId queryId(); + QueryWritableBatch toWritableBatch(); + VectorContainer batch(); + List<SerializedField> fields(); + + /** + * Package that contains only a query ID. Send for a query that + * finishes with no data. The results are null: no data, no schema. + */ + public static class EmptyResultsPackage implements QueryDataPackage { + + private final QueryId queryId; + + public EmptyResultsPackage(QueryId queryId) { + this.queryId = queryId; + } + + @Override + public QueryId queryId() { return queryId; } + + /** + * Creates a message that sends only the query ID to the + * client. + */ + @Override + public QueryWritableBatch toWritableBatch() { + QueryData header = QueryData.newBuilder() + .setQueryId(queryId) + .setRowCount(0) + .setDef(RecordBatchDef.getDefaultInstance()) + .build(); + return new QueryWritableBatch(header); + } + + @Override + public VectorContainer batch() { return null; } + + @Override + public List<SerializedField> fields() { + return Collections.emptyList(); + } + } + + /** + * Represents a batch of data with a schema. + */ + public static class DataPackage implements QueryDataPackage { + private final RecordMaterializer materializer; + private final OperatorStats stats; + + public DataPackage(RecordMaterializer materializer, OperatorStats stats) { + this.materializer = materializer; + this.stats = stats; + } + + @Override + public QueryId queryId() { return materializer.queryId(); } + + @Override + public QueryWritableBatch toWritableBatch() { + QueryWritableBatch batch = materializer.convertNext(); + stats.addLongStat(Metric.BYTES_SENT, batch.getByteCount()); + return batch; + } + + @Override + public VectorContainer batch() { + return materializer.incoming(); + } + + @Override + public List<SerializedField> fields() { + List<SerializedField> metadata = new ArrayList<>(); + for (VectorWrapper<?> vw : batch()) { + metadata.add(vw.getValueVector().getMetadata()); + } + return metadata; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java index e69bd51..f9537b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java @@ -24,7 +24,6 @@ import java.util.Arrays; import org.apache.drill.exec.proto.UserBitShared.QueryData; public class QueryWritableBatch { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class); private final QueryData header; private final ByteBuf[] buffers; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java index 75de592..9c2f7ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java @@ -17,9 +17,14 @@ */ package org.apache.drill.exec.physical.impl.materialize; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.record.VectorContainer; public interface RecordMaterializer { public QueryWritableBatch convertNext(); + public QueryId queryId(); + + public VectorContainer incoming(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java index 7cdf9b3..c294774 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java @@ -25,16 +25,16 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryData; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionSet; public class VectorRecordMaterializer implements RecordMaterializer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorRecordMaterializer.class); - private QueryId queryId; - private RecordBatch batch; - private BufferAllocator allocator; - private OptionManager options; + private final QueryId queryId; + private final RecordBatch batch; + private final BufferAllocator allocator; + private final boolean resultResultsForDDL; public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) { this.queryId = context.getHandle().getQueryId(); @@ -42,19 +42,27 @@ public class VectorRecordMaterializer implements RecordMaterializer { this.allocator = oContext.getAllocator(); BatchSchema schema = batch.getSchema(); assert schema != null : "Schema must be defined."; - options = context.getOptions(); + OptionSet options = context.getOptions(); + this.resultResultsForDDL = options.getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL); } + @Override public QueryWritableBatch convertNext() { WritableBatch w = batch.getWritableBatch().transfer(allocator); QueryData.Builder builder = QueryData.newBuilder() .setQueryId(queryId) .setRowCount(batch.getRecordCount()) .setDef(w.getDef()); - if (!options.getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL)) { + if (!resultResultsForDDL) { int count = w.getDef().getAffectedRowsCount(); builder.setAffectedRowsCount(count == -1 ? 0 : count); } return new QueryWritableBatch(builder.build(), w.getBuffers()); } + + @Override + public QueryId queryId() { return queryId; } + + @Override + public VectorContainer incoming() { return batch.getContainer(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index cb835e7..d4087fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -25,7 +25,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; * Implements an AbstractUnaryRecordBatch where the incoming record batch is * known at the time of creation * - * @param <T> + * @param <T> the plan definition of the operator */ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractUnaryRecordBatch<T> { @@ -43,12 +43,15 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } /** - * Based on lastKnownOutcome and if there are more records to be output for current record boundary detected by - * EMIT outcome, this method returns EMIT or OK outcome. + * Based on lastKnownOutcome and if there are more records to be output for + * current record boundary detected by EMIT outcome, this method returns EMIT + * or OK outcome. + * * @param hasMoreRecordInBoundary - * @return - EMIT - If the lastknownOutcome was EMIT and output records corresponding to all the incoming records in - * current record boundary is already produced. - * - OK - otherwise + * @return EMIT - If the lastknownOutcome was EMIT and output records + * corresponding to all the incoming records in current record + * boundary is already produced. + * OK - otherwise */ protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) { final IterOutcome lastOutcome = getLastKnownOutcome(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 6d8865d..2ee1047 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -30,7 +30,6 @@ import org.apache.drill.common.types.TypeProtos.MajorType; * {@link org.apache.drill.exec.record.metadata.TupleMetadata} instead. */ public class BatchSchema implements Iterable<MaterializedField> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class); private final SelectionVectorMode selectionVectorMode; private final List<MaterializedField> fields; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 064c601..1eb2ac0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -51,7 +51,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class); private final BufferAllocator allocator; - private VectorContainer container = new VectorContainer(); + private VectorContainer container; private int valueCount; private BatchSchema schema; @@ -60,6 +60,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp */ public RecordBatchLoader(BufferAllocator allocator) { this.allocator = Preconditions.checkNotNull(allocator); + this.container = new VectorContainer(allocator); } public BufferAllocator allocator() { return allocator; } @@ -116,12 +117,10 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp vector = TypeHelper.getNewVector(fieldDef, allocator); // If the field is a map or a dict, check if the schema changed. - } else if ((vector.getField().getType().getMinorType() == MinorType.MAP || vector.getField().getType().getMinorType() == MinorType.DICT) && ! isSameSchema(vector.getField().getChildren(), field.getChildList())) { // The schema changed. Discard the old one and create a new one. - schemaChanged = true; vector.clear(); vector = TypeHelper.getNewVector(fieldDef, allocator); @@ -155,8 +154,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp container = newVectors; container.setRecordCount(valueCount); } catch (final Throwable cause) { - // We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should - // adjudicate to call upper layer specific clean up logic. + // We have to clean up new vectors created here and pass over the actual cause. + // It is upper layer who should adjudicate to call upper layer specific clean up logic. VectorAccessibleUtilities.clear(newVectors); throw cause; } finally { @@ -190,7 +189,6 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp // Column order can permute (see DRILL-5828). So, use a map // for matching. - Map<String, MaterializedField> childMap = CaseInsensitiveMap.newHashMap(); for (MaterializedField currentChild : currentChildren) { childMap.put(currentChild.getName(), currentChild); @@ -199,13 +197,11 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp MaterializedField currentChild = childMap.get(newChild.getNamePart().getName()); // New map member? - if (currentChild == null) { return false; } // Changed data type? - if (! currentChild.getType().equals(newChild.getMajorType())) { return false; } @@ -223,7 +219,6 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp } // Everything matches. - return true; } @@ -232,20 +227,6 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp return container.getValueVectorId(path); } -// -// @SuppressWarnings("unchecked") -// public <T extends ValueVector> T getValueVectorId(int fieldId, Class<?> clazz) { -// ValueVector v = container.get(fieldId); -// assert v != null; -// if (v.getClass() != clazz){ -// logger.warn(String.format( -// "Failure while reading vector. Expected vector class of %s but was holding vector class %s.", -// clazz.getCanonicalName(), v.getClass().getCanonicalName())); -// return null; -// } -// return (T) v; -// } - @Override public int getRecordCount() { return valueCount; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index 966ade7..1b30d78 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.record; import io.netty.buffer.DrillBuf; +import java.util.ArrayList; import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; @@ -28,7 +29,6 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; /** * A specialized version of record batch that can moves out buffers and preps @@ -52,7 +52,7 @@ public class WritableBatch implements AutoCloseable { } public WritableBatch transfer(BufferAllocator allocator) { - List<DrillBuf> newBuffers = Lists.newArrayList(); + List<DrillBuf> newBuffers = new ArrayList<>(); for (DrillBuf buf : buffers) { int writerIndex = buf.writerIndex(); DrillBuf newBuf = buf.transferOwnership(allocator).buffer; @@ -135,7 +135,7 @@ public class WritableBatch implements AutoCloseable { } public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) { - List<ValueVector> vectors = Lists.newArrayList(); + List<ValueVector> vectors = new ArrayList<>(); for (VectorWrapper<?> vw : vws) { Preconditions.checkArgument(!vw.isHyper()); vectors.add(vw.getValueVector()); @@ -144,8 +144,8 @@ public class WritableBatch implements AutoCloseable { } public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) { - List<DrillBuf> buffers = Lists.newArrayList(); - List<SerializedField> metadata = Lists.newArrayList(); + List<DrillBuf> buffers = new ArrayList<>(); + List<SerializedField> metadata = new ArrayList<>(); for (ValueVector vv : vectors) { metadata.add(vv.getMetadata()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java index 2934c34..0fd2862 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.rpc; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import org.apache.drill.exec.proto.UserBitShared.QueryId; @@ -29,13 +31,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** - * Helps to run a query and await on the results. All the inheriting sub-class manages the session/connection - * state and submits query with respect to that state. The subclass instance lifetime is per query lifetime - * and is not re-used. + * Helps to run a query and await on the results. All the inheriting sub-class + * manages the session/connection state and submits query with respect to that + * state. The subclass instance lifetime is per query lifetime and is not + * re-used. */ public abstract class AbstractDisposableUserClientConnection implements UserClientConnection { - private static final org.slf4j.Logger logger = - org.slf4j.LoggerFactory.getLogger(AbstractDisposableUserClientConnection.class); + private static final Logger logger = + LoggerFactory.getLogger(AbstractDisposableUserClientConnection.class); protected final CountDownLatch latch = new CountDownLatch(1); @@ -72,7 +75,8 @@ public abstract class AbstractDisposableUserClientConnection implements UserClie final QueryId queryId = result.getQueryId(); if (logger.isDebugEnabled()) { - logger.debug("Result arrived for QueryId: {} with QueryState: {}", QueryIdHelper.getQueryId(queryId), state); + logger.debug("Result arrived for QueryId: {} with QueryState: {}", + QueryIdHelper.getQueryId(queryId), state); } switch (state) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java index 1372b29..179cc7c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java @@ -18,7 +18,8 @@ package org.apache.drill.exec.rpc; import io.netty.channel.ChannelFuture; -import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; + +import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.rpc.user.UserSession; @@ -26,12 +27,14 @@ import org.apache.drill.exec.rpc.user.UserSession; import java.net.SocketAddress; /** - * Interface for getting user session properties and interacting with user connection. Separating this interface from - * {@link AbstractRemoteConnection} implementation for user connection: + * Interface for getting user session properties and interacting with user + * connection. Separating this interface from {@link AbstractRemoteConnection} + * implementation for user connection: * <p><ul> - * <li> Connection is passed to Foreman and Screen operators. Instead passing this interface exposes few details. - * <li> Makes it easy to have wrappers around user connection which can be helpful to tap the messages and data - * going to the actual client. + * <li>Connection is passed to Foreman and Screen operators. Instead passing + * this interface exposes few details. + * <li>Makes it easy to have wrappers around user connection which can be + * helpful to tap the messages and data going to the actual client. * </ul> */ public interface UserClientConnection { @@ -41,7 +44,7 @@ public interface UserClientConnection { UserSession getSession(); /** - * Send query result outcome to client. Outcome is returned through <code>listener</code> + * Send query result outcome to client. Outcome is returned through {@code listener}. * * @param listener * @param result @@ -49,12 +52,12 @@ public interface UserClientConnection { void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result); /** - * Send query data to client. Outcome is returned through <code>listener</code> + * Send query data to client. Outcome is returned through {@code listener}. * * @param listener * @param result */ - void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result); + void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data); /** * Returns the {@link ChannelFuture} which will be notified when this @@ -66,4 +69,4 @@ public interface UserClientConnection { * @return Return the client node address. */ SocketAddress getRemoteAddress(); -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java index 461d8aa..25f1c68 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java @@ -78,5 +78,4 @@ public class AwaitableUserResultsListener implements UserResultsListener { } return count.get(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index 99b0723..460dbb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -120,13 +120,13 @@ public class QueryResultHandler { try { if (isFailureResult) { - // Failure case--pass on via submissionFailed(...). + // Failure case--pass on via submissionFailed(...). resultsListener.submissionFailed(new UserRemoteException(queryResult.getError(0))); // Note: Listener is removed in finally below. } else if (isTerminalResult) { - // A successful completion/canceled case--pass on via resultArrived + // A successful completion/canceled case--pass on via resultArrived try { resultsListener.queryCompleted(queryState); } catch (Exception e) { @@ -189,9 +189,9 @@ public class QueryResultHandler { UserResultsListener resultsListener = queryIdToResultsListenersMap.get(queryId); logger.trace("For QueryId [{}], retrieved results listener {}", queryId, resultsListener); if (null == resultsListener) { + // WHO?? didn't get query ID response and set submission listener yet, // so install a buffering listener for now - BufferingResultsListener bl = new BufferingResultsListener(); resultsListener = queryIdToResultsListenersMap.putIfAbsent(queryId, bl); // If we had a successful insertion, use that reference. Otherwise, just @@ -272,8 +272,7 @@ public class QueryResultHandler { } @Override - public void queryIdArrived(QueryId queryId) { - } + public void queryIdArrived(QueryId queryId) { } } private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java index 1d864f2..190e9ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java @@ -25,14 +25,21 @@ import org.apache.drill.exec.rpc.ConnectionThrottle; public interface UserResultsListener { /** - * QueryId is available. Called when a query is successfully submitted to the server. - * @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK} + * QueryId is available. Called when a query is successfully submitted to the + * server. + * + * @param queryId + * sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK + * Acks.OK} */ void queryIdArrived(QueryId queryId); /** - * The query has failed. Most likely called when the server returns a FAILED query state. Can also be called if - * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()} throws an exception + * The query has failed. Most likely called when the server returns a FAILED + * query state. Can also be called if + * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()} + * throws an exception + * * @param ex exception describing the cause of the failure */ void submissionFailed(UserException ex); @@ -45,10 +52,9 @@ public interface UserResultsListener { void dataArrived(QueryDataBatch result, ConnectionThrottle throttle); /** - * The query has completed (successsful completion or cancellation). The listener will not receive any other - * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED) - * @param state + * The query has completed (successful completion or cancellation). The + * listener will not receive any other data or result message. Called when the + * server returns a terminal-non failing- state (COMPLETED or CANCELLED) */ void queryCompleted(QueryState state); - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 276758e..cb1db13 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -31,6 +31,7 @@ import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; @@ -63,6 +64,7 @@ import org.apache.drill.exec.work.user.UserWorker; import org.apache.hadoop.security.HadoopKerberosName; import org.joda.time.DateTime; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLEngine; import javax.security.sasl.SaslException; @@ -75,7 +77,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class UserServer extends BasicServer<RpcType, BitToUserConnection> { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class); + private static final Logger logger = LoggerFactory.getLogger(UserServer.class); private static final String SERVER_NAME = "Apache Drill Server"; private final UserConnectionConfig config; @@ -84,7 +86,7 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { private final UserWorker userWorker; private static final ConcurrentHashMap<BitToUserConnection, BitToUserConnectionConfig> userConnectionMap; - //Initializing the singleton map during startup + // Initialize the singleton map during startup static { userConnectionMap = new ConcurrentHashMap<>(); } @@ -195,11 +197,13 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { } /** - * It represents a client connection accepted by Foreman Drillbit's UserServer from a DrillClient. This connection - * is used to get hold of {@link UserSession} which stores all session related information like session options - * changed over the lifetime of this connection. There is a 1:1 mapping between a BitToUserConnection and a - * UserSession. This connection object is also used to send query data and result back to the client submitted as part - * of the session tied to this connection. + * Represents a client connection accepted by Foreman Drillbit's UserServer + * from a DrillClient. This connection is used to get hold of + * {@link UserSession} which stores all session related information like + * session options changed over the lifetime of this connection. There is a + * 1:1 mapping between a BitToUserConnection and a UserSession. This + * connection object is also used to send query data and result back to the + * client submitted as part of the session tied to this connection. */ public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection> implements UserClientConnection { @@ -240,7 +244,6 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { * Sets the user on the session, and finalizes the session. * * @param userName user name to set on the session - * */ void finalizeSession(String userName) { // create a session @@ -262,9 +265,10 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { } @Override - public UserSession getSession(){ - return session; - } + public UserSession getSession() { return session; } + + @Override + protected Logger getLogger() { return logger; } @Override public void sendResult(final RpcOutcomeListener<Ack> listener, final QueryResult result) { @@ -273,17 +277,13 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { } @Override - public void sendData(final RpcOutcomeListener<Ack> listener, final QueryWritableBatch result) { + public void sendData(final RpcOutcomeListener<Ack> listener, final QueryDataPackage data) { + QueryWritableBatch result = data.toWritableBatch(); logger.trace("Sending data to client with {}", result); send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, false, result.getBuffers()); } @Override - protected Logger getLogger() { - return logger; - } - - @Override public ChannelFuture getChannelClosureFuture() { return getChannel().closeFuture() .addListener(new GenericFutureListener<Future<? super Void>>() { @@ -504,10 +504,10 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { * User Connection's config for System Table access */ public class BitToUserConnectionConfig { - private DateTime established; - private boolean isAuthEnabled; - private boolean isEncryptionEnabled; - private boolean isSSLEnabled; + private final DateTime established; + private final boolean isAuthEnabled; + private final boolean isEncryptionEnabled; + private final boolean isSSLEnabled; public BitToUserConnectionConfig() { established = new DateTime(); //Current Joda-based Time diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index 0798dea..a7594f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -49,11 +49,13 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class UserSession implements AutoCloseable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class); + private static final Logger logger = LoggerFactory.getLogger(UserSession.class); - private boolean supportComplexTypes = false; + private boolean supportComplexTypes; private UserCredentials credentials; private DrillProperties properties; private SessionOptionManager sessionOptions; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java new file mode 100644 index 0000000..42d93f2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server.rest; + +import java.net.SocketAddress; + +import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection; +import org.apache.drill.exec.rpc.ConnectionThrottle; +import org.apache.drill.exec.rpc.user.UserSession; + +import io.netty.channel.ChannelFuture; + +public abstract class BaseWebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle { + + protected WebSessionResources webSessionResources; + + public BaseWebUserConnection(WebSessionResources webSessionResources) { + this.webSessionResources = webSessionResources; + } + + @Override + public UserSession getSession() { + return webSessionResources.getSession(); + } + + @Override + public ChannelFuture getChannelClosureFuture() { + return webSessionResources.getCloseFuture(); + } + + @Override + public SocketAddress getRemoteAddress() { + return webSessionResources.getRemoteAddress(); + } + + @Override + public void setAutoRead(boolean enableAutoRead) { } + + public WebSessionResources resources() { + return webSessionResources; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java index 141c027..c345571 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java @@ -237,9 +237,7 @@ public class DrillRestServer extends ResourceConfig { } @Override - public void dispose(WebUserConnection instance) { - - } + public void dispose(WebUserConnection instance) { } } public static class AnonWebUserConnectionProvider implements Factory<WebUserConnection> { @@ -300,14 +298,14 @@ public class DrillRestServer extends ResourceConfig { } @Override - public void dispose(WebUserConnection instance) { - - } + public void dispose(WebUserConnection instance) { } /** - * Creates session user principal. If impersonation is enabled without authentication and User-Name header is present and valid, - * will create session user principal with provided user name, otherwise anonymous user name will be used. - * In both cases session user principal will have admin rights. + * Creates session user principal. If impersonation is enabled without + * authentication and User-Name header is present and valid, will create + * session user principal with provided user name, otherwise anonymous user + * name will be used. In both cases session user principal will have admin + * rights. * * @param config drill config * @param request client request @@ -322,10 +320,12 @@ public class DrillRestServer extends ResourceConfig { } return new AnonDrillUserPrincipal(); } - } - // Provider which injects DrillUserPrincipal directly instead of getting it from SecurityContext and typecasting + /** + * Provider which injects DrillUserPrincipal directly instead of getting it + * from SecurityContext and typecasting + */ public static class DrillUserPrincipalProvider implements Factory<DrillUserPrincipal> { @Inject HttpServletRequest request; @@ -336,9 +336,7 @@ public class DrillRestServer extends ResourceConfig { } @Override - public void dispose(DrillUserPrincipal principal) { - // No-Op - } + public void dispose(DrillUserPrincipal principal) { } } // Provider which creates and cleanups DrillUserPrincipal for anonymous (auth disabled) mode diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java index 4aa2061..e29050a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java @@ -150,7 +150,7 @@ public class LogsResources { private File getFileByName(File folder, final String name) { File[] files = folder.listFiles((dir, fileName) -> fileName.equals(name)); if (files.length == 0) { - throw new DrillRuntimeException (name + " doesn't exist"); + throw new DrillRuntimeException(name + " doesn't exist"); } return files[0]; } @@ -159,9 +159,9 @@ public class LogsResources { @XmlRootElement public class Log implements Comparable<Log> { - private String name; - private long size; - private DateTime lastModified; + private final String name; + private final long size; + private final DateTime lastModified; @JsonCreator public Log (@JsonProperty("name") String name, @JsonProperty("size") long size, @JsonProperty("lastModified") long lastModified) { @@ -190,9 +190,9 @@ public class LogsResources { @XmlRootElement public class LogContent { - private String name; - private Collection<String> lines; - private int maxLines; + private final String name; + private final Collection<String> lines; + private final int maxLines; @JsonCreator public LogContent (@JsonProperty("name") String name, @JsonProperty("lines") Collection<String> lines, @JsonProperty("maxLines") int maxLines) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java index 94fcea9..be4c331 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java @@ -22,6 +22,7 @@ import java.util.Map; import javax.xml.bind.annotation.XmlRootElement; import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.shaded.guava.com.google.common.base.CharMatcher; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.parquet.Strings; @@ -98,7 +99,7 @@ public class QueryWrapper { public static final class RestQueryBuilder { private String query; - private String queryType = "SQL"; + private String queryType = QueryType.SQL.name(); private int rowLimit; private String userName; private String defaultSchema; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java index e944386..69d81ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java @@ -62,7 +62,7 @@ public class RestQueryRunner { this.options = webUserConnection.getSession().getOptions(); } - public RestQueryRunner.QueryResult run() throws Exception { + public QueryResult run() throws Exception { applyUserName(); applyOptions(); applyDefaultSchema(); @@ -131,7 +131,7 @@ public class RestQueryRunner { return maxRows; } - public RestQueryRunner.QueryResult submitQuery(int maxRows) { + public QueryResult submitQuery(int maxRows) { final RunQuery runQuery = RunQuery.newBuilder() .setType(QueryType.valueOf(query.getQueryType())) .setPlan(query.getQuery()) @@ -161,7 +161,7 @@ public class RestQueryRunner { } } while (!isComplete && !nearlyOutOfHeapSpace); - //Fail if nearly out of heap space + // Fail if nearly out of heap space if (nearlyOutOfHeapSpace) { UserException almostOutOfHeapException = UserException.resourceError() .message("There is not enough heap memory to run this query using the web interface. ") @@ -185,7 +185,7 @@ public class RestQueryRunner { return new QueryResult(queryId, webUserConnection, webUserConnection.results); } - //Detect possible excess heap + // Detect possible excess heap private float getHeapUsage() { return (float) memMXBean.getHeapMemoryUsage().getUsed() / memMXBean.getHeapMemoryUsage().getMax(); } @@ -198,15 +198,16 @@ public class RestQueryRunner { public final String queryState; public final int attemptedAutoLimit; - //DRILL-6847: Modified the constructor so that the method has access to all the properties in webUserConnection + // DRILL-6847: Modified the constructor so that the method has access + // to all the properties in webUserConnection public QueryResult(QueryId queryId, WebUserConnection webUserConnection, List<Map<String, String>> rows) { - this.queryId = QueryIdHelper.getQueryId(queryId); - this.columns = webUserConnection.columns; - this.metadata = webUserConnection.metadata; - this.queryState = webUserConnection.getQueryState(); - this.rows = rows; - this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount(); - } + this.queryId = QueryIdHelper.getQueryId(queryId); + this.columns = webUserConnection.columns; + this.metadata = webUserConnection.metadata; + this.queryState = webUserConnection.getQueryState(); + this.rows = rows; + this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount(); + } public String getQueryId() { return queryId; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java index 6819457..e00cb48 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java @@ -262,12 +262,11 @@ public class WebServer implements AutoCloseable { responseHeadersSettingFilter.setInitParameters(ResponseHeadersSettingFilter.retrieveResponseHeaders(config)); servletContextHandler.addFilter(responseHeadersSettingFilter, "/*", EnumSet.of(DispatcherType.REQUEST)); - return servletContextHandler; } /** - * It creates A {@link SessionHandler} which contains a {@link HashSessionManager} + * Create a {@link SessionHandler} which contains a {@link HashSessionManager} * * @param securityHandler Set of init parameters that are used by the Authentication * @return session handler @@ -279,9 +278,7 @@ public class WebServer implements AutoCloseable { sessionManager.getSessionCookieConfig().setHttpOnly(true); sessionManager.addEventListener(new HttpSessionListener() { @Override - public void sessionCreated(HttpSessionEvent se) { - - } + public void sessionCreated(HttpSessionEvent se) { } @Override public void sessionDestroyed(HttpSessionEvent se) { @@ -338,8 +335,9 @@ public class WebServer implements AutoCloseable { } /** - * Create an HTTPS connector for given jetty server instance. If the admin has specified keystore/truststore settings - * they will be used else a self-signed certificate is generated and used. + * Create an HTTPS connector for given jetty server instance. If the admin has + * specified keystore/truststore settings they will be used else a self-signed + * certificate is generated and used. * * @return Initialized {@link ServerConnector} for HTTPS connections. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java index 016278f..c06770e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java @@ -22,22 +22,23 @@ import org.apache.drill.common.AutoCloseables; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.ChannelClosedException; import org.apache.drill.exec.rpc.user.UserSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.SocketAddress; /** - * Class holding all the resources required for Web User Session. This class is responsible for the proper cleanup of - * all the resources. + * Holds the resources required for Web User Session. This class is responsible + * for the proper cleanup of all the resources. */ public class WebSessionResources implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(WebSessionResources.class); - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResources.class); - - private BufferAllocator allocator; + private final BufferAllocator allocator; private final SocketAddress remoteAddress; - private UserSession webUserSession; + private final UserSession webUserSession; private ChannelPromise closeFuture; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java index 0ca6abe..9427fc3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java @@ -17,177 +17,128 @@ */ package org.apache.drill.exec.server.rest; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.util.ValueVectorElementFormatter; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.DrillBuf; -import io.netty.channel.ChannelFuture; -import org.apache.drill.common.exceptions.UserException; + import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; +import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection; import org.apache.drill.exec.rpc.Acks; -import org.apache.drill.exec.rpc.ConnectionThrottle; import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.vector.ValueVector.Accessor; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; -import java.net.SocketAddress; import java.util.List; import java.util.Map; import java.util.ArrayList; import java.util.Set; /** - * WebUserConnectionWrapper which represents the UserClientConnection between WebServer and Foreman, for the WebUser - * submitting the query. It provides access to the UserSession executing the query. There is no actual physical + * {@code WebUserConnectionWrapper} which represents the {@code UserClientConnection} between + * WebServer and Foreman, for the WebUser submitting the query. It provides + * access to the {@code UserSession} executing the query. There is no actual physical * channel corresponding to this connection wrapper. * - * It returns a close future with no actual underlying {@link io.netty.channel.Channel} associated with it but do have an - * EventExecutor out of BitServer EventLoopGroup. Since there is no actual connection established using this class, - * hence the close event will never be fired by underlying layer and close future is set only when the + * It returns a close future with no actual underlying + * {@link io.netty.channel.Channel} associated with it but do have an + * {@code EventExecutor} out of BitServer EventLoopGroup. Since there is no actual + * connection established using this class, hence the close event will never be + * fired by underlying layer and close future is set only when the * {@link WebSessionResources} are closed. */ - -public class WebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle { - - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebUserConnection.class); - - protected WebSessionResources webSessionResources; +public class WebUserConnection extends BaseWebUserConnection { public final List<Map<String, String>> results = Lists.newArrayList(); - public final Set<String> columns = Sets.newLinkedHashSet(); - public final List<String> metadata = new ArrayList<>(); - private int autoLimitRowCount; + private int rowCount; WebUserConnection(WebSessionResources webSessionResources) { - this.webSessionResources = webSessionResources; + super(webSessionResources); } @Override - public UserSession getSession() { - return webSessionResources.getSession(); + public void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data) { + processBatch(data.batch()); + listener.success(Acks.OK, null); } - @Override - public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) { - // There can be overflow here but DrillBuf doesn't support allocating with - // bytes in long. Hence we are just preserving the earlier behavior and logging debug log for the case. - final int dataByteCount = (int) result.getByteCount(); - - if (dataByteCount < 0) { - if (logger.isDebugEnabled()) { - logger.debug("There is BufferOverflow in dataByteCount: {}", - dataByteCount); - } - listener.success(Acks.OK, null); + private void processBatch(VectorContainer batch) { + if (batch == null) { + // Empty query: no data, no schema. return; } - // Create a ByteBuf with all the data in it. - final int rows = result.getHeader().getRowCount(); - final BufferAllocator allocator = webSessionResources.getAllocator(); - final DrillBuf bufferWithData = allocator.buffer(dataByteCount); - try { - final ByteBuf[] resultDataBuffers = result.getBuffers(); - - for (final ByteBuf buffer : resultDataBuffers) { - bufferWithData.writeBytes(buffer); - buffer.release(); - } - - final RecordBatchLoader loader = new RecordBatchLoader(allocator); - try { - loader.load(result.getHeader().getDef(), bufferWithData); - // TODO: Clean: DRILL-2933: That load(...) no longer throws - // SchemaChangeException, so check/clean catch clause below. - for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) { - //DRILL-6847: This section adds query metadata to the REST results - MaterializedField col = loader.getSchema().getColumn(i); - columns.add(col.getName()); - StringBuilder dataType = new StringBuilder(col.getType().getMinorType().name()); - - //For DECIMAL type - if (col.getType().hasPrecision()) { - dataType.append("("); - dataType.append(col.getType().getPrecision()); - - if (col.getType().hasScale()) { - dataType.append(", "); - dataType.append(col.getType().getScale()); - } - - dataType.append(")"); - } else if (col.getType().hasWidth()) { - //Case for VARCHAR columns with specified width - dataType.append("("); - dataType.append(col.getType().getWidth()); - dataType.append(")"); - } - metadata.add(dataType.toString()); - } - ValueVectorElementFormatter formatter = new ValueVectorElementFormatter(webSessionResources.getSession().getOptions()); - for (int i = 0; i < rows; ++i) { - final Map<String, String> record = Maps.newHashMap(); - for (VectorWrapper<?> vw : loader) { - final String field = vw.getValueVector().getMetadata().getNamePart().getName(); - final TypeProtos.MinorType fieldMinorType = vw.getValueVector().getMetadata().getMajorType().getMinorType(); - final Accessor accessor = vw.getValueVector().getAccessor(); - final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null; - final String display = value == null ? null : formatter.format(value, fieldMinorType); - record.put(field, display); - } - results.add(record); - } - } finally { - loader.clear(); - } - } catch (Exception e) { - boolean verbose = webSessionResources.getSession().getOptions().getBoolean(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY); - // Wrapping the exception into UserException and then into DrillPBError. - // It will be thrown as exception in QueryWrapper class. - // It's verbosity depends on system option "exec.errors.verbose". - error = UserException.systemError(e).build(logger).getOrCreatePBError(verbose); - } finally { - // Notify the listener with ACK.OK both in error/success case because data was send successfully from Drillbit. - bufferWithData.release(); - listener.success(Acks.OK, null); + // Build metadata only on first batch, or if the schema changes + if (metadata.isEmpty() || batch.isSchemaChanged()) { + columns.clear(); + metadata.clear(); + buildMetadata(batch.getSchema()); } + addResults(batch.getRecordCount(), batch); + batch.zeroVectors(); } - @Override - public ChannelFuture getChannelClosureFuture() { - return webSessionResources.getCloseFuture(); - } + private void buildMetadata(BatchSchema schema) { + for (int i = 0; i < schema.getFieldCount(); ++i) { + // DRILL-6847: This section adds query metadata to the REST results + MaterializedField col = schema.getColumn(i); + columns.add(col.getName()); + StringBuilder dataType = new StringBuilder(col.getType().getMinorType().name()); + + // For DECIMAL type + if (col.getType().hasPrecision()) { + dataType.append("("); + dataType.append(col.getType().getPrecision()); + + if (col.getType().hasScale()) { + dataType.append(", "); + dataType.append(col.getType().getScale()); + } - @Override - public SocketAddress getRemoteAddress() { - return webSessionResources.getRemoteAddress(); + dataType.append(")"); + } else if (col.getType().hasWidth()) { + // Case for VARCHAR columns with specified width + dataType.append("("); + dataType.append(col.getType().getWidth()); + dataType.append(")"); + } + metadata.add(dataType.toString()); + } } - @Override - public void setAutoRead(boolean enableAutoRead) { - // no-op + private void addResults(int rows, VectorAccessible batch) { + ValueVectorElementFormatter formatter = new ValueVectorElementFormatter(webSessionResources.getSession().getOptions()); + if (autoLimitRowCount > 0) { + rows = Math.max(0, Math.min(rows, autoLimitRowCount - rowCount)); + } + for (int i = 0; i < rows; ++i) { + rowCount++; + final Map<String, String> record = Maps.newHashMap(); + for (VectorWrapper<?> vw : batch) { + final String field = vw.getValueVector().getMetadata().getNamePart().getName(); + final TypeProtos.MinorType fieldMinorType = vw.getValueVector().getMetadata().getMajorType().getMinorType(); + final Accessor accessor = vw.getValueVector().getAccessor(); + final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null; + final String display = value == null ? null : formatter.format(value, fieldMinorType); + record.put(field, display); + } + results.add(record); + } } /** * For authenticated WebUser no cleanup of {@link WebSessionResources} is done since it's re-used * for all the queries until lifetime of the web session. */ - public void cleanupSession() { - // no-op - } + public void cleanupSession() { } public static class AnonWebUserConnection extends WebUserConnection { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index ac1b4cc..de49e28 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -21,6 +21,8 @@ import com.codahale.metrics.Gauge; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.drill.common.SelfCleaningRunnable; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.coord.ClusterCoordinator; @@ -60,11 +62,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** - * Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments - * running elsewhere. + * Manages the running fragments in a Drillbit. Periodically requests run-time + * stats updates from fragments running elsewhere. */ public class WorkManager implements AutoCloseable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class); + private static final Logger logger = LoggerFactory.getLogger(WorkManager.class); private static final int EXIT_TIMEOUT_MS = 5000; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java index ef018f6..5de560a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java @@ -18,14 +18,14 @@ package org.apache.drill.exec.work.prepare; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; + import io.netty.channel.ChannelFuture; import org.apache.drill.common.exceptions.ErrorHelper; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; +import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage; import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; @@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserProtos.RequestStatus; import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.Response; @@ -53,6 +54,8 @@ import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.store.ischema.InfoSchemaConstants; import org.apache.drill.exec.work.user.UserWorker; import org.joda.time.Period; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.net.SocketAddress; @@ -72,11 +75,11 @@ import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT; * Contains worker {@link Runnable} for creating a prepared statement and helper methods. */ public class PreparedStatementProvider { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class); + private static final Logger logger = LoggerFactory.getLogger(PreparedStatementProvider.class); /** - * Static list of mappings from {@link MinorType} to JDBC ResultSet class name (to be returned through - * {@link ResultSetMetaData#getColumnClassName(int)}. + * Static list of mappings from {@link MinorType} to JDBC ResultSet class name + * (to be returned through {@link ResultSetMetaData#getColumnClassName(int)}. */ private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME = ImmutableMap.<MinorType, String>builder() .put(MinorType.INT, Integer.class.getName()) @@ -186,9 +189,10 @@ public class PreparedStatementProvider { } /** - * Helper method to create {@link DrillPBError} and set it in <code>respBuilder</code> + * Helper method to create {@link DrillPBError} and set it in {@code respBuilder} */ - private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final RequestStatus status, + private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, + final RequestStatus status, final Throwable ex, final String message, final ErrorType errorType) { respBuilder.setStatus(status); final String errorId = UUID.randomUUID().toString(); @@ -211,7 +215,7 @@ public class PreparedStatementProvider { } /** - * Helper method to log error and set given {@link DrillPBError} in <code>respBuilder</code> + * Helper method to log error and set given {@link DrillPBError} in {@code respBuilder} */ private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final DrillPBError error, final String message) { @@ -250,14 +254,14 @@ public class PreparedStatementProvider { } @Override - public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) { + public void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data) { // Save the query results schema and release the buffers. - if (fields == null) { - fields = result.getHeader().getDef().getFieldList(); - } - - for (ByteBuf buf : result.getBuffers()) { - buf.release(); + VectorContainer batch = data.batch(); + if (batch != null) { + if (fields == null) { + fields = data.fields(); + } + batch.zeroVectors(); } listener.success(Acks.OK, null); @@ -287,16 +291,18 @@ public class PreparedStatementProvider { builder.setCatalogName(InfoSchemaConstants.IS_CATALOG_NAME); /** - * Designated column's schema name. Empty string if not applicable. Initial implementation defaults to empty string - * as we use LIMIT 0 queries to get the schema and schema info is lost. If we derive the schema from plan, we may - * get the right value. + * Designated column's schema name. Empty string if not applicable. Initial + * implementation defaults to empty string as we use LIMIT 0 queries to get + * the schema and schema info is lost. If we derive the schema from plan, we + * may get the right value. */ builder.setSchemaName(""); /** - * Designated column's table name. Not set if not applicable. Initial implementation defaults to empty string as - * we use LIMIT 0 queries to get the schema and table info is lost. If we derive the table from plan, we may get - * the right value. + * Designated column's table name. Not set if not applicable. Initial + * implementation defaults to empty string as we use LIMIT 0 queries to get + * the schema and table info is lost. If we derive the table from plan, we + * may get the right value. */ builder.setTableName(""); @@ -327,7 +333,8 @@ public class PreparedStatementProvider { builder.setPrecision(Types.getPrecision(field.getMajorType())); /** - * Column's number of digits to right of the decimal point. 0 is returned for types where the scale is not applicable + * Column's number of digits to right of the decimal point. 0 is returned + * for types where the scale is not applicable */ builder.setScale(Types.getScale(majorType)); @@ -342,8 +349,8 @@ public class PreparedStatementProvider { builder.setDisplaySize(Types.getJdbcDisplaySize(majorType)); /** - * Is the column an aliased column. Initial implementation defaults to true as we derive schema from LIMIT 0 query and - * not plan + * Is the column an aliased column. Initial implementation defaults to true + * as we derive schema from LIMIT 0 query and not plan */ builder.setIsAliased(true); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java index 9c32b56..976820d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java @@ -41,9 +41,11 @@ import org.apache.drill.exec.work.foreman.Foreman; import org.apache.drill.exec.work.metadata.MetadataProvider; import org.apache.drill.exec.work.metadata.ServerMetaProvider.ServerMetaWorker; import org.apache.drill.exec.work.prepare.PreparedStatementProvider.PreparedStatementWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class UserWorker{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class); + static final Logger logger = LoggerFactory.getLogger(UserWorker.class); private final WorkerBee bee; private final QueryCountIncrementer incrementer = new QueryCountIncrementer() { diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java index 3fc5f7c..9e90d06 100644 --- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java +++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java @@ -42,8 +42,9 @@ import org.slf4j.LoggerFactory; import io.netty.util.internal.PlatformDependent; /** - * Drill data structure for accessing and manipulating data buffers. This class is integrated with the - * Drill memory management layer for quota enforcement and buffer sharing. + * Drill data structure for accessing and manipulating data buffers. This class + * is integrated with the Drill memory management layer for quota enforcement + * and buffer sharing. */ @SuppressWarnings("unused") public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { @@ -121,18 +122,22 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { } /** - * Create a new DrillBuf that is associated with an alternative allocator for the purposes of memory ownership and - * accounting. This has no impact on the reference counting for the current DrillBuf except in the situation where the - * passed in Allocator is the same as the current buffer. + * Create a new DrillBuf that is associated with an alternative allocator for + * the purposes of memory ownership and accounting. This has no impact on the + * reference counting for the current DrillBuf except in the situation where + * the passed in Allocator is the same as the current buffer. * - * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a - * reference count of 1 (in the case that this is the first time this memory is being associated with the new - * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination - * in the case that the provided allocator already had an association to this underlying memory. + * This operation has no impact on the reference count of this DrillBuf. The + * newly created DrillBuf with either have a reference count of 1 (in the case + * that this is the first time this memory is being associated with the new + * allocator) or the current value of the reference count + 1 for the other + * AllocationManager/BufferLedger combination in the case that the provided + * allocator already had an association to this underlying memory. * * @param target * The target allocator to create an association with. - * @return A new DrillBuf which shares the same underlying memory as this DrillBuf. + * @return A new DrillBuf which shares the same underlying memory as this + * DrillBuf. */ public DrillBuf retain(BufferAllocator target) { @@ -148,28 +153,35 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { } /** - * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf - * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the - * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the target allocator. If - * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not - * transfer any ownership to the newly created DrillBuf. + * Transfer the memory accounting ownership of this DrillBuf to another + * allocator. This will generate a new DrillBuf that carries an association + * with the underlying memory of this DrillBuf. If this DrillBuf is connected + * to the owning BufferLedger of this memory, that memory ownership/accounting + * will be transferred to the target allocator. If this DrillBuf does not + * currently own the memory underlying it (and is only associated with it), + * this does not transfer any ownership to the newly created DrillBuf. * <p> - * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a - * reference count of 1 (in the case that this is the first time this memory is being associated with the new - * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in - * the case that the provided allocator already had an association to this underlying memory. + * This operation has no impact on the reference count of this DrillBuf. The + * newly created DrillBuf with either have a reference count of 1 (in the case + * that this is the first time this memory is being associated with the new + * allocator) or the current value of the reference count for the other + * AllocationManager/BufferLedger combination in the case that the provided + * allocator already had an association to this underlying memory. * <p> - * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible - * due to the fact that the original owning allocator may have allocated this memory out of a local reservation - * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done - * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely - * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit - * slightly sooner than is necessary. + * Transfers will always succeed, even if that puts the other allocator into + * an overlimit situation. This is possible due to the fact that the original + * owning allocator may have allocated this memory out of a local reservation + * whereas the target allocator may need to allocate new memory from a parent + * or RootAllocator. This operation is done in a mostly-lockless but + * consistent manner. As such, the overlimit==true situation could occur + * slightly prematurely to an actual overlimit==true condition. This is simply + * conservative behavior which means we may return overlimit slightly sooner + * than is necessary. * * @param target * The allocator to transfer ownership to. - * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly - * created DrillBuf. + * @return A new transfer result with the impact of the transfer (whether it + * was overlimit) as well as the newly created DrillBuf. */ public TransferResult transferOwnership(BufferAllocator target) { diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java index bfce9a0..36550d0 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java @@ -43,11 +43,11 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; * UDLE. Ensures that one allocator owns the memory that multiple allocators may * be referencing. Manages a BufferLedger between each of its associated * allocators. This class is also responsible for managing when memory is - * allocated and returned to the Netty-based PooledByteBufAllocatorL. - * + * allocated and returned to the Netty-based {code PooledByteBufAllocatorL}. + * <p> * The only reason that this isn't package private is we're forced to put * DrillBuf in Netty's package which need access to these objects or methods. - * + * <p> * Threading: AllocationManager manages thread-safety internally. Operations * within the context of a single BufferLedger are lockless in nature and can be * leveraged by multiple threads. Operations that cross the context of two @@ -56,7 +56,6 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; * allocation. As such, there will be thousands of these in a typical query. The * contention of acquiring a lock on AllocationManager should be very low. */ - public class AllocationManager { private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); @@ -151,7 +150,6 @@ public class AllocationManager { * AllocationManager that it now longer needs to hold a reference to * particular piece of memory. */ - private class ReleaseListener { private final BufferAllocator allocator; @@ -163,7 +161,6 @@ public class AllocationManager { /** * Can only be called when you already hold the writeLock. */ - public void release() { allocator.assertOpen(); @@ -200,7 +197,6 @@ public class AllocationManager { * only reason this is public is due to DrillBuf being in io.netty.buffer * package. */ - public class BufferLedger { private final IdentityHashMap<DrillBuf, Object> buffers = @@ -223,8 +219,10 @@ public class AllocationManager { } /** - * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no - * memory, no transfer is made to the new ledger. + * Transfer any balance the current ledger has to the target ledger. In the + * case that the current ledger holds no memory, no transfer is made to the + * new ledger. + * * @param target * The ledger to transfer ownership account to. * @return Whether transfer fit within target ledgers limits. @@ -241,8 +239,8 @@ public class AllocationManager { return true; } - // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure - // that this won't happen by synchronizing on the allocator manager instance. + // since two balance transfers out from the allocator manager could cause incorrect accounting, + // we need to ensure that this won't happen by synchronizing on the allocator manager instance. try (@SuppressWarnings("unused") Closeable write = writeLock.open()) { if (owningLedger != this) { return true; @@ -316,7 +314,6 @@ public class AllocationManager { * zero, this ledger should release its ownership back to the * AllocationManager */ - public int decrement(int decrement) { allocator.assertOpen(); @@ -346,7 +343,6 @@ public class AllocationManager { * @param allocator * @return The ledger associated with a particular BufferAllocator. */ - public BufferLedger getLedgerForAllocator(BufferAllocator allocator) { return associate((BaseAllocator) allocator); } @@ -362,7 +358,6 @@ public class AllocationManager { * @return A new DrillBuf that shares references with all DrillBufs * associated with this BufferLedger */ - public DrillBuf newDrillBuf(int offset, int length) { allocator.assertOpen(); return newDrillBuf(offset, length, null); @@ -411,7 +406,6 @@ public class AllocationManager { * * @return Size in bytes */ - public int getSize() { return size; } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java index 8514df0..9498cc8 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java @@ -20,20 +20,16 @@ package org.apache.drill.exec.rpc; import io.netty.buffer.ByteBuf; public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class); @Override - public void failed(RpcException ex) { - } + public void failed(RpcException ex) { } @Override - public void success(T value, ByteBuf buffer) { - } + public void success(T value, ByteBuf buffer) { } /** * {@inheritDoc} */ @Override - public void interrupted(final InterruptedException ex) { - } + public void interrupted(final InterruptedException ex) { } } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java index 83380e2..cc2d9b9 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.carrotsearch.hppc.IntObjectHashMap; import com.carrotsearch.hppc.procedures.IntObjectProcedure; @@ -38,7 +40,7 @@ import io.netty.channel.ChannelFuture; * else works via Atomic variables. */ class RequestIdMap { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestIdMap.class); + private static final Logger logger = LoggerFactory.getLogger(RequestIdMap.class); private final AtomicInteger lastCoordinationId = new AtomicInteger(); private final AtomicBoolean isOpen = new AtomicBoolean(true); @@ -72,9 +74,9 @@ class RequestIdMap { @Override public void apply(int key, RpcOutcome<?> value) { - try{ + try { value.setException(exception); - }catch (final Exception e){ + } catch (final Exception e){ logger.warn("Failure while attempting to fail rpc response.", e); } } @@ -158,6 +160,7 @@ class RequestIdMap { return rpc; } + @SuppressWarnings("unchecked") public <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId, Class<V> clazz) { final RpcOutcome<?> rpc = removeFromMap(coordinationId); @@ -172,16 +175,10 @@ class RequestIdMap { clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName())); } - @SuppressWarnings("unchecked") - final - RpcOutcome<V> crpc = (RpcOutcome<V>) rpc; - - // logger.debug("Returning casted future"); - return crpc; + return (RpcOutcome<V>) rpc; } public void recordRemoteFailure(int coordinationId, DrillPBError failure) { - // logger.debug("Updating failed future."); try { final RpcOutcome<?> rpc = removeFromMap(coordinationId); rpc.setException(new UserRemoteException(failure)); diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java index a1c125b..4afa159 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java @@ -27,13 +27,11 @@ public interface RpcOutcomeListener<V> { */ void failed(RpcException ex); - void success(V value, ByteBuf buffer); /** - * Called when the sending thread is interrupted. Possible when the fragment is cancelled due to query cancellations or - * failures. + * Called when the sending thread is interrupted. Possible when the fragment + * is cancelled due to query cancellations or failures. */ void interrupted(final InterruptedException e); - }
