http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java index 4cd4155..1675b52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java @@ -72,7 +72,7 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne connection = connectionHolder.get(); if (connection != null) { cmd.connectionAvailable(connection); - + } else { // logger.debug("No connection active, opening client connection."); BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> client = getNewClient(); @@ -145,7 +145,7 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne incoming.getChannel().close(); } set(connection); - + } }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java index cc3ec69..3a139f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java @@ -17,12 +17,12 @@ */ package org.apache.drill.exec.rpc; -import java.util.concurrent.ExecutionException; - import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.concurrent.ExecutionException; + import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.user.ConnectionThrottle; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java index af1316c..d75e902 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java @@ -21,7 +21,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure; public class RemoteRpcException extends RpcException{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteRpcException.class); - + private final RpcFailure failure; public RemoteRpcException(RpcFailure failure) { @@ -32,7 +32,7 @@ public class RemoteRpcException extends RpcException{ public RpcFailure getFailure() { return failure; } - - - + + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java index a19b753..615bccc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java @@ -37,11 +37,13 @@ public class ResettableBarrier { setState(1); } + @Override protected int tryAcquireShared(int acquires) { assert acquires == 1; return (getState() == 0) ? 1 : -1; } + @Override protected boolean tryReleaseShared(int releases) { assert releases == 1; @@ -76,7 +78,7 @@ public class ResettableBarrier { // logger.debug("opening barrier."); sync.releaseShared(1); } - + public void closeBarrier(){ // logger.debug("closing barrier."); sync.reset(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java index 12b73a8..b48adec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java @@ -24,19 +24,19 @@ import com.google.protobuf.MessageLite; public class Response { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Response.class); - + public EnumLite rpcType; public MessageLite pBody; public ByteBuf[] dBodies; - + public Response(EnumLite rpcType, MessageLite pBody, ByteBuf... dBodies) { super(); this.rpcType = rpcType; this.pBody = pBody; this.dBodies = dBodies; } - - - - + + + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 05cca5a..c6979e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.rpc; -import com.google.common.base.Stopwatch; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.Channel; @@ -31,9 +29,7 @@ import io.netty.util.concurrent.GenericFutureListener; import java.io.Closeable; import java.util.Arrays; import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.drill.exec.proto.BitControl; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java index 8434ba9..11b07ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java @@ -23,9 +23,9 @@ import com.google.common.util.concurrent.AbstractCheckedFuture; import com.google.common.util.concurrent.ListenableFuture; public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{ - + volatile ByteBuf buffer; - + public RpcCheckedFuture(ListenableFuture<T> delegate) { super(delegate); } @@ -33,7 +33,7 @@ public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> public void set(T obj, ByteBuf buffer){ this.buffer = buffer; } - + @Override protected RpcException mapException(Exception e) { return RpcException.mapException(e); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java index 9523177..93b901c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.rpc; -import org.apache.drill.exec.rpc.RpcConnectionHandler; - import com.google.protobuf.MessageLite; public interface RpcCommand<T extends MessageLite, C extends RemoteConnection> extends RpcConnectionHandler<C>{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java index 6ab121e..3010f2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java @@ -30,13 +30,13 @@ public class RpcConfig { private final String name; private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap; private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap; - + private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap){ this.name = name; this.sendMap = ImmutableMap.copyOf(sendMap); this.receiveMap = ImmutableMap.copyOf(receiveMap); } - + public String getName() { return name; } @@ -51,7 +51,7 @@ public class RpcConfig { } return true; } - + public boolean checkSend(EnumLite send, Class<?> sendClass, Class<?> receiveClass){ if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking send classes for send RpcType %s. Send Class is %s and Receive class is %s.", send, sendClass, receiveClass)); RpcMessageType<?,?,?> type = sendMap.get(send); @@ -62,16 +62,16 @@ public class RpcConfig { return true; } - + public boolean checkResponseSend(EnumLite responseType, Class<?> responseClass){ if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking responce send of type %s with response class of %s.", responseType, responseClass)); RpcMessageType<?,?,?> type = receiveMap.get(responseType.getNumber()); if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc response of type %s.", name, responseType)); if(type.getRet() != responseClass) throw new IllegalStateException(String.format("%s: The definition for the response doesn't match implementation code. The definition is %s however the current response is trying to response with an object of type %s.", name, type, responseClass.getCanonicalName())); - + return true; } - + public static class RpcMessageType<SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>{ private T sendEnum; private Class<SEND> send; @@ -113,37 +113,37 @@ public class RpcConfig { return "RpcMessageType [sendEnum=" + sendEnum + ", send=" + send + ", receiveEnum=" + receiveEnum + ", ret=" + ret + "]"; } - - + + } public static RpcConfigBuilder newBuilder(String name){ return new RpcConfigBuilder(name); } - + public static class RpcConfigBuilder { private final String name; private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap(); - private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap(); - + private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap(); + private RpcConfigBuilder(String name){ this.name = name; } - + public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite> RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec){ RpcMessageType<SEND, RECEIVE, T> type = new RpcMessageType<SEND, RECEIVE, T>(sendEnum, send, receiveEnum, rec); this.sendMap.put(sendEnum, type); this.receiveMap.put(receiveEnum.getNumber(), type); return this; } - + public RpcConfig build(){ return new RpcConfig(name, sendMap, receiveMap); } } - - + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java index bcec2d4..7618231 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java @@ -19,10 +19,10 @@ package org.apache.drill.exec.rpc; public interface RpcConnectionHandler<T extends RemoteConnection> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class); - + public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION} - + public void connectionSucceeded(T connection); public void connectionFailed(FailureType type, Throwable t); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java index b1ae932..4be365c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java @@ -19,9 +19,9 @@ package org.apache.drill.exec.rpc; public class RpcConstants { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConstants.class); - + private RpcConstants(){} - + public static final boolean SOME_DEBUGGING = false; public static final boolean EXTRA_DEBUGGING = false; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java index abddcb3..f4fe64d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java @@ -34,20 +34,20 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader; */ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { final org.slf4j.Logger logger; - + private final AtomicLong messageCounter = new AtomicLong(); - + public RpcDecoder(String name){ this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "-" + name); } - + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { if(!ctx.channel().isOpen()){ return; } - + if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Inbound rpc message received."); // now, we know the entire message is in the buffer and the buffer is constrained to this message. Additionally, @@ -60,7 +60,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { final RpcHeader header = RpcHeader.parseDelimitedFrom(is); if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex()); - + // read the protobuf body into a buffer. checkTag(is, RpcEncoder.PROTOBUF_BODY_TAG); final int pBodyLength = readRawVarint32(is); @@ -70,13 +70,13 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody); if(RpcConstants.EXTRA_DEBUGGING) logger.debug("post protobufbody read index {}", buffer.readerIndex()); - + ByteBuf dBody = null; int dBodyLength = 0; // read the data body. if (buffer.readableBytes() > 0) { - + if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reading raw body, buffer has {} bytes available, is available {}.", buffer.readableBytes(), is.available()); checkTag(is, RpcEncoder.RAW_BODY_TAG); dBodyLength = readRawVarint32(is); @@ -84,7 +84,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { dBody = buffer.slice(); dBody.retain(); if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Read raw body of {}", dBody); - + }else{ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("No need to read raw body, no readable bytes left."); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java index 9c3f5ca..8bf3483 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java @@ -29,7 +29,6 @@ import java.util.List; import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader; -import com.google.common.base.Preconditions; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.WireFormat; @@ -38,36 +37,36 @@ import com.google.protobuf.WireFormat; */ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ final org.slf4j.Logger logger; - + static final int HEADER_TAG = makeTag(CompleteRpcMessage.HEADER_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED); static final int PROTOBUF_BODY_TAG = makeTag(CompleteRpcMessage.PROTOBUF_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED); static final int RAW_BODY_TAG = makeTag(CompleteRpcMessage.RAW_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED); static final int HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG); static final int PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG); static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG); - + public RpcEncoder(String name){ this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "-" + name); } - + @Override protected void encode(ChannelHandlerContext ctx, OutboundRpcMessage msg, List<Object> out) throws Exception { if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Rpc Encoder called with msg {}", msg); - + if(!ctx.channel().isOpen()){ //output.add(ctx.alloc().buffer(0)); logger.debug("Channel closed, skipping encode."); return; } - + try{ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Encoding outbound message {}", msg); - // first we build the RpcHeader + // first we build the RpcHeader RpcHeader header = RpcHeader.newBuilder() // .setMode(msg.mode) // .setCoordinationId(msg.coordinationId) // .setRpcType(msg.rpcType).build(); - + // figure out the full length int headerLength = header.getSerializedSize(); int protoBodyLength = msg.pBody.getSerializedSize(); @@ -75,7 +74,7 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ int fullLength = // HEADER_TAG_LENGTH + getRawVarintSize(headerLength) + headerLength + // PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(protoBodyLength) + protoBodyLength; // - + if(rawBodyLength > 0){ fullLength += (RAW_BODY_TAG_LENGTH + getRawVarintSize(rawBodyLength) + rawBodyLength); } @@ -86,7 +85,7 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ // write full length first (this is length delimited stream). cos.writeRawVarint32(fullLength); - + // write header cos.writeRawVarint32(HEADER_TAG); cos.writeRawVarint32(headerLength); @@ -96,15 +95,15 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ cos.writeRawVarint32(PROTOBUF_BODY_TAG); cos.writeRawVarint32(protoBodyLength); msg.pBody.writeTo(cos); - + // if exists, write data body and tag. if(msg.getRawBodySize() > 0){ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Writing raw body of size {}", msg.getRawBodySize()); - + cos.writeRawVarint32(RAW_BODY_TAG); cos.writeRawVarint32(rawBodyLength); cos.flush(); // need to flush so that dbody goes after if cos is caching. - + CompositeByteBuf cbb = new CompositeByteBuf(buf.alloc(), true, msg.dBodies.length + 1); cbb.addComponent(buf); int bufLength = buf.readableBytes(); @@ -114,27 +113,27 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ } cbb.writerIndex(bufLength); out.add(cbb); - - + + }else{ cos.flush(); out.add(buf); } - + if(RpcConstants.SOME_DEBUGGING) logger.debug("Wrote message length {}:{} bytes (head:body). Message: " + msg, getRawVarintSize(fullLength), fullLength); if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Sent message. Ending writer index was {}.", buf.writerIndex()); - + }finally{ // make sure to release Rpc Messages underlying byte buffers. //msg.release(); } } - + /** Makes a tag value given a field number and wire type, copied from WireFormat since it isn't public. */ static int makeTag(final int fieldNumber, final int wireType) { return (fieldNumber << 3) | wireType; } - + public static int getRawVarintSize(int value) { int count = 0; while (true) { @@ -147,5 +146,5 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ } } } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java index ee42ee5..537452e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java @@ -22,14 +22,14 @@ import io.netty.channel.ChannelHandlerContext; public class RpcExceptionHandler implements ChannelHandler{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class); - + public RpcExceptionHandler(){ } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - + if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){ logger.warn("Exception with closed channel", cause); return; @@ -48,5 +48,5 @@ public class RpcExceptionHandler implements ChannelHandler{ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java index 7f97e27..9712c9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java @@ -17,24 +17,22 @@ */ package org.apache.drill.exec.rpc; -import io.netty.buffer.ByteBuf; - import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; public abstract class RpcMessage { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcMessage.class); - + public RpcMode mode; public int rpcType; public int coordinationId; - + public RpcMessage(RpcMode mode, int rpcType, int coordinationId) { this.mode = mode; this.rpcType = rpcType; this.coordinationId = coordinationId; } - + public abstract int getBodySize(); abstract void release(); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java index c60d1df..af9aa01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java @@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf; public interface RpcOutcome<T> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcome.class); - + public void set(Object value, ByteBuf buffer); public void setException(Throwable t); public Class<T> getOutcomeType(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java index a4bf2c2..7d7c860 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java @@ -20,10 +20,10 @@ package org.apache.drill.exec.rpc; import io.netty.buffer.ByteBuf; public interface RpcOutcomeListener<V> { - + public void failed(RpcException ex); public void success(V value, ByteBuf buffer); - - + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java index 649dc09..37c9ce2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java @@ -28,13 +28,13 @@ import com.google.common.collect.Maps; public class ConnectionManagerRegistry implements Iterable<ControlConnectionManager>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class); - + private final ConcurrentMap<DrillbitEndpoint, ControlConnectionManager> registry = Maps.newConcurrentMap(); - + private final ControlMessageHandler handler; private final BootStrapContext context; private volatile DrillbitEndpoint localEndpoint; - + public ConnectionManagerRegistry(ControlMessageHandler handler, BootStrapContext context) { super(); this.handler = handler; @@ -49,7 +49,7 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana ControlConnectionManager m2 = registry.putIfAbsent(endpoint, m); if(m2 != null) m = m2; } - + return m; } @@ -57,9 +57,9 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana public Iterator<ControlConnectionManager> iterator() { return registry.values().iterator(); } - + public void setEndpoint(DrillbitEndpoint endpoint){ this.localEndpoint = endpoint; } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java index 84595c6..879df40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java @@ -48,7 +48,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo private final ControlConnectionManager.CloseHandlerCreator closeHandlerFactory; private final DrillbitEndpoint localIdentity; private final BufferAllocator allocator; - + public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler, BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) { super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER); this.localIdentity = localEndpoint; @@ -57,7 +57,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo this.closeHandlerFactory = closeHandlerFactory; this.allocator = context.getAllocator(); } - + public void connect(RpcConnectionHandler<ControlConnection> connectionHandler) { connectAsClient(connectionHandler, BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort()); } @@ -102,5 +102,5 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { return new ControlProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java index c03b7c2..6ac6dd5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java @@ -72,6 +72,7 @@ public class ControlConnection extends RemoteConnection { active = false; } + @Override public boolean isActive() { return active; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java index f3274a9..60af2a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java @@ -29,12 +29,12 @@ import org.apache.drill.exec.work.batch.ControlMessageHandler; */ public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class); - + private final DrillbitEndpoint endpoint; private final ControlMessageHandler handler; private final BootStrapContext context; private final DrillbitEndpoint localIdentity; - + public ControlConnectionManager(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localIdentity, ControlMessageHandler handler, BootStrapContext context) { super(BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort()); assert remoteEndpoint != null : "Endpoint cannot be null."; @@ -52,9 +52,9 @@ public class ControlConnectionManager extends ReconnectingConnection<ControlConn return new ControlClient(endpoint, localIdentity, handler, context, new CloseHandlerCreator()); } - + public DrillbitEndpoint getEndpoint() { return endpoint; } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java index 7edfe20..36573b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java @@ -32,8 +32,9 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder; public class ControlProtobufLengthDecoder extends ProtobufLengthDecoder{ public ControlProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { super(allocator, outOfMemoryHandler); - + } + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { super.decode(ctx, in, out); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java index 9953e5f..31fbe7b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java @@ -33,7 +33,7 @@ import org.apache.drill.exec.rpc.RpcConfig; public class ControlRpcConfig { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcConfig.class); - + public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-CONTROL-RPC-MAPPING") // .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class) .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class) @@ -42,8 +42,8 @@ public class ControlRpcConfig { .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class) .build(); - + public static int RPC_VERSION = 2; - + public static final Response OK = new Response(RpcType.ACK, Acks.OK); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java index fc1d98c..393773d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java @@ -37,19 +37,19 @@ import com.google.protobuf.MessageLite; public class ControlServer extends BasicServer<RpcType, ControlConnection>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlServer.class); - + private final ControlMessageHandler handler; private final ConnectionManagerRegistry connectionRegistry; private volatile ProxyCloseHandler proxyCloseHandler; private BufferAllocator allocator; - + public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) { super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup()); this.handler = handler; this.connectionRegistry = connectionRegistry; this.allocator = context.getAllocator(); } - + @Override public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { return DefaultInstanceHandler.getResponseDefaultInstance(rpcType); @@ -70,12 +70,12 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ public ControlConnection initRemoteConnection(Channel channel) { return new ControlConnection(channel, this, allocator); } - - + + @Override protected ServerHandshakeHandler<BitControlHandshake> getHandshakeHandler(final ControlConnection connection) { return new ServerHandshakeHandler<BitControlHandshake>(RpcType.HANDSHAKE, BitControlHandshake.PARSER){ - + @Override public MessageLite getHandshakeResponse(BitControlHandshake inbound) throws Exception { // logger.debug("Handling handshake from other bit. {}", inbound); @@ -83,13 +83,13 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ if(!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint())); connection.setEndpoint(inbound.getEndpoint()); - // add the + // add the ControlConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint()); - + // update the close handler. proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler())); - - // add to the connection manager. + + // add to the connection manager. manager.addExternalConnection(connection); return BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).build(); @@ -106,7 +106,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> { private volatile GenericFutureListener<ChannelFuture> handler; - + public ProxyCloseHandler(GenericFutureListener<ChannelFuture> handler) { super(); this.handler = handler; @@ -126,8 +126,8 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ public void operationComplete(ChannelFuture future) throws Exception { handler.operationComplete(future); } - + } - - + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java index 9a11c94..7f84a2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java @@ -21,7 +21,6 @@ import java.io.Closeable; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.work.fragment.FragmentManager; /** * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a @@ -34,7 +33,7 @@ public interface Controller extends Closeable { /** * Get a Bit to Bit communication tunnel. If the BitCom doesn't have a tunnel attached to the node already, it will * start creating one. This create the connection asynchronously. - * + * * @param node * @return */ @@ -42,5 +41,5 @@ public interface Controller extends Closeable { public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException; - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java index 1cacc4f..f8f6fd7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java @@ -17,16 +17,12 @@ */ package org.apache.drill.exec.rpc.control; -import java.util.concurrent.ConcurrentMap; - import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.work.batch.ControlMessageHandler; -import org.apache.drill.exec.work.fragment.FragmentManager; -import com.google.common.collect.Maps; import com.google.common.io.Closeables; /** http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java index ed0d370..10fe343 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java @@ -30,7 +30,7 @@ import com.google.protobuf.MessageLite; public class DefaultInstanceHandler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultInstanceHandler.class); - + public static MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { switch (rpcType) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index cbfa1f9..60d2cdf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -22,9 +22,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.LoadingCache; import org.apache.drill.exec.cache.DistributedMap; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.proto.BitControl.FragmentStatus; @@ -39,6 +36,8 @@ import org.apache.drill.exec.work.fragment.FragmentManager; import org.apache.drill.exec.work.fragment.NonRootFragmentManager; import org.apache.drill.exec.work.fragment.RootFragmentManager; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Maps; public class WorkEventBus { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java index 4583f88..1d539a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.RemoteConnection; -import org.apache.drill.exec.work.fragment.FragmentManager; public class BitServerConnection extends RemoteConnection{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index ed51264..67856f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -45,14 +45,14 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl private final BufferAllocator allocator; private final DataConnectionManager.CloseHandlerCreator closeHandlerFactory; - + public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) { super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER); this.remoteEndpoint = remoteEndpoint; this.closeHandlerFactory = closeHandlerFactory; this.allocator = context.getAllocator(); } - + @Override public DataClientConnection initRemoteConnection(Channel channel) { this.connection = new DataClientConnection(channel, this); @@ -77,7 +77,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl BufferAllocator getAllocator(){ return allocator; } - + @Override protected void validateHandshake(BitServerHandshake handshake) throws RpcException { if(handshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION)); @@ -90,7 +90,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl public DataClientConnection getConnection(){ return this.connection; } - + @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { return new DataProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java index 9656a14..ecd10eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java @@ -31,18 +31,18 @@ import com.google.common.io.Closeables; import com.google.protobuf.MessageLite; public class DataClientConnection extends RemoteConnection{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClientConnection.class); - + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClientConnection.class); + private final DataClient client; private final UUID id; - + public DataClientConnection(Channel channel, DataClient client){ super(channel); this.client = client; // we use a local listener pool unless a global one is provided. this.id = UUID.randomUUID(); } - + @Override public BufferAllocator getAllocator() { return client.getAllocator(); @@ -50,10 +50,10 @@ public class DataClientConnection extends RemoteConnection{ public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies){ - client.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies); - + client.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies); + } - + @Override public int hashCode() { final int prime = 31; @@ -75,6 +75,6 @@ public class DataClientConnection extends RemoteConnection{ } public void shutdownIfClient(){ - Closeables.closeQuietly(client); + Closeables.closeQuietly(client); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java index bd0e9e0..b2ea855 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java @@ -30,7 +30,7 @@ public class DataConnectionManager extends ReconnectingConnection<DataClientConn private final DrillbitEndpoint endpoint; private final BootStrapContext context; - + public DataConnectionManager(FragmentHandle handle, DrillbitEndpoint endpoint, BootStrapContext context) { super(hs(handle), endpoint.getAddress(), endpoint.getDataPort()); this.endpoint = endpoint; @@ -41,7 +41,7 @@ public class DataConnectionManager extends ReconnectingConnection<DataClientConn protected DataClient getNewClient() { return new DataClient(endpoint, context, new CloseHandlerCreator()); } - + private static BitClientHandshake hs(FragmentHandle handle){ return BitClientHandshake // .newBuilder() // http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java index 270dae6..fab275a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java @@ -28,7 +28,7 @@ import com.google.protobuf.MessageLite; public class DataDefaultInstanceHandler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataDefaultInstanceHandler.class); - + public static MessageLite getResponseDefaultInstanceClient(int rpcType) throws RpcException { switch (rpcType) { @@ -36,12 +36,12 @@ public class DataDefaultInstanceHandler { return Ack.getDefaultInstance(); case RpcType.HANDSHAKE_VALUE: return BitServerHandshake.getDefaultInstance(); - + default: throw new UnsupportedOperationException(); } } - + public static MessageLite getResponseDefaultInstanceServer(int rpcType) throws RpcException { switch (rpcType) { case RpcType.ACK_VALUE: @@ -50,7 +50,7 @@ public class DataDefaultInstanceHandler { return BitClientHandshake.getDefaultInstance(); case RpcType.REQ_RECORD_BATCH_VALUE: return FragmentRecordBatch.getDefaultInstance(); - + default: throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java index b648c72..193b050 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java @@ -30,9 +30,10 @@ public class DataProtobufLengthDecoder extends ProtobufLengthDecoder{ public DataProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { super(allocator, outOfMemoryHandler); - + } - + + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { super.decode(ctx, in, out); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java index 45d95d9..6d0901f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java @@ -22,7 +22,6 @@ import org.apache.drill.exec.proto.BitData.BitClientHandshake; import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.Response; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 8611b5a..8e503ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.rpc.data; -import io.netty.buffer.DrillBuf; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 546f967..5aa4aa6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -28,9 +28,9 @@ import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.rpc.DrillRpcFuture; import org.apache.drill.exec.rpc.FutureBitCommand; import org.apache.drill.exec.rpc.ListeningCommand; +import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType; public class DataTunnel { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index 4bbb13e..b12a4cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -27,7 +27,6 @@ import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; -import org.apache.drill.exec.proto.beans.DrillPBError; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcBus; import org.apache.drill.exec.rpc.RpcException; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index d49a9fd..f352a15 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -84,6 +84,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType)); } + @Override protected Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { switch (rpcType) { case RpcType.QUERY_RESULT_VALUE: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java index 99e7777..266f112 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java @@ -30,8 +30,9 @@ public class UserProtobufLengthDecoder extends ProtobufLengthDecoder{ public UserProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { super(allocator, outOfMemoryHandler); - + } + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { super.decode(ctx, in, out); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java index 71f5994..9f83a4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java @@ -21,9 +21,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.rpc.RpcException; public interface UserResultsListener { - + public abstract void queryIdArrived(QueryId queryId); - public abstract void submissionFailed(RpcException ex); + public abstract void submissionFailed(RpcException ex); public abstract void resultArrived(QueryResultBatch result, ConnectionThrottle throttle); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 9b5e830..e386ad3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -33,7 +33,6 @@ import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; -import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicServer; import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index e7be380..2710837 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -17,15 +17,17 @@ */ package org.apache.drill.exec.rpc.user; -import com.google.common.collect.Maps; +import java.util.Map; + import net.hydromatic.optiq.SchemaPlus; + import org.apache.drill.exec.proto.UserBitShared.UserCredentials; import org.apache.drill.exec.proto.UserProtos.Property; import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.SessionOptionManager; -import java.util.Map; +import com.google.common.collect.Maps; public class UserSession { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java index 6855e72..45f0683 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java @@ -17,13 +17,13 @@ */ package org.apache.drill.exec.schema; +import java.util.List; +import java.util.Map; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.util.List; -import java.util.Map; - public class DataRecord { private final Map<Integer, Object> dataMap; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java index dbc4fd6..af41e9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java @@ -17,10 +17,10 @@ */ package org.apache.drill.exec.schema; -import com.google.common.collect.Lists; - import java.util.List; +import com.google.common.collect.Lists; + public class DiffSchema { List<Field> addedFields; List<Field> removedFields; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java index 14049ef..35732ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.schema; -import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java index b2f1c45..a8b13a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.schema; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java index c207d39..20e055d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java @@ -40,6 +40,7 @@ public class NamedField extends Field { this.keyType = keyType; } + @Override public String getFieldName() { return fieldName; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java index 13f0dc1..19103d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java @@ -17,14 +17,14 @@ */ package org.apache.drill.exec.schema; +import java.util.List; +import java.util.Map; + import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.util.List; -import java.util.Map; - public class ObjectSchema implements RecordSchema { private final Map<String, Field> fields; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java index 096ef4c..6fc3e8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java @@ -17,10 +17,6 @@ */ package org.apache.drill.exec.schema; -import org.apache.drill.common.expression.PathSegment; -import org.apache.drill.common.expression.SchemaPath; - -import java.io.IOException; public interface Record { public DiffSchema getSchemaChanges(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java index 1e54336..93228c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.schema.json.jackson; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; +import java.io.IOException; + import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; -import java.io.IOException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; public class JacksonHelper { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index 41d13a0..3da2ea9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -18,8 +18,6 @@ package org.apache.drill.exec.server; import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import java.io.Closeable; @@ -28,7 +26,6 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.metrics.DrillMetrics; -import org.apache.drill.exec.rpc.NamedThreadFactory; import org.apache.drill.exec.rpc.TransportCheck; import com.codahale.metrics.MetricRegistry; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 57e07a0..a9e11a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -19,8 +19,6 @@ package org.apache.drill.exec.server; import java.io.Closeable; -import javax.servlet.Servlet; - import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.DistributedCache; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java index df0ee05..57b1e78 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java @@ -30,10 +30,10 @@ public class StartupOptions { @Parameter(names={"-h", "--help"}, description="Provide description of usage.", help=true) private boolean help = false; - + @Parameter(names= {"-d", "--debug"}, description="Whether you want to run the program in debug mode.", required=false) private boolean debug = false; - + @Parameter(names= {"-c", "--config"}, description="Configuration file you want to load. Defaults to loading 'drill-override.conf' from the classpath.", required=false) private String configLocation = null; @@ -62,5 +62,5 @@ public class StartupOptions { } return args; } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java index aaf5719..46d316b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.server.options; +import java.util.Iterator; +import java.util.Map; + +import org.eigenbase.sql.SqlLiteral; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import org.eigenbase.sql.SqlLiteral; - -import java.util.Iterator; -import java.util.Map; public class FragmentOptionManager implements OptionManager { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java index 8b5306f..7eda97e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.server.options; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import org.eigenbase.sql.SqlLiteral; - import java.util.Iterator; import java.util.Map; +import org.eigenbase.sql.SqlLiteral; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + public class QueryOptionManager implements OptionManager { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java index 53c26c1..4268d02 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.server.options; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.Map; + import org.apache.drill.exec.server.options.OptionValue.OptionType; import org.eigenbase.sql.SqlLiteral; -import java.util.Iterator; -import java.util.Map; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; public class SessionOptionManager implements OptionManager{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index d4ff627..40e2aaf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -17,7 +17,11 @@ */ package org.apache.drill.exec.server.options; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + import org.apache.commons.collections.IteratorUtils; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; @@ -30,10 +34,7 @@ import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; import org.eigenbase.sql.SqlLiteral; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; +import com.google.common.collect.Maps; public class SystemOptionManager implements OptionManager{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java index 2f43374..0398215 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java @@ -17,14 +17,14 @@ */ package org.apache.drill.exec.server.options; +import java.math.BigDecimal; + import org.apache.drill.common.exceptions.ExpressionParsingException; import org.apache.drill.exec.server.options.OptionValue.Kind; import org.apache.drill.exec.server.options.OptionValue.OptionType; import org.eigenbase.sql.SqlLiteral; import org.eigenbase.util.NlsString; -import java.math.BigDecimal; - public class TypeValidators { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeValidators.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java index db07ab4..3e972b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java @@ -17,12 +17,7 @@ */ package org.apache.drill.exec.server.rest; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.google.common.collect.Lists; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.proto.CoordinationProtos; -import org.apache.drill.exec.work.WorkManager; -import org.glassfish.jersey.server.mvc.Viewable; +import java.util.List; import javax.inject.Inject; import javax.ws.rs.GET; @@ -30,7 +25,14 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.xml.bind.annotation.XmlRootElement; -import java.util.List; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.work.WorkManager; +import org.glassfish.jersey.server.mvc.Viewable; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.google.common.collect.Lists; @Path("/") public class DrillRoot { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/MetricsResources.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/MetricsResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/MetricsResources.java index 802b61f..28a292b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/MetricsResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/MetricsResources.java @@ -17,13 +17,13 @@ */ package org.apache.drill.exec.server.rest; -import org.glassfish.jersey.server.mvc.Viewable; - import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import org.glassfish.jersey.server.mvc.Viewable; + @Path("/metrics") public class MetricsResources { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsResources.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java index 003f8ae..ba54781 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/PluginConfigWrapper.java @@ -18,13 +18,14 @@ package org.apache.drill.exec.server.rest; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; +import javax.xml.bind.annotation.XmlRootElement; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.store.StoragePluginRegistry; -import javax.xml.bind.annotation.XmlRootElement; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; @XmlRootElement public class PluginConfigWrapper { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java index 63ecb98..d158ccc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java @@ -17,7 +17,16 @@ */ package org.apache.drill.exec.server.rest; -import com.google.common.base.Preconditions; +import java.text.DateFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Map; + import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; @@ -28,22 +37,13 @@ import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.proto.UserBitShared.StreamProfile; import org.apache.drill.exec.proto.helper.QueryIdHelper; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.text.DateFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Locale; -import java.util.Map; - public class ProfileWrapper { public QueryProfile profile; @@ -528,6 +528,7 @@ public class ProfileWrapper { appendCell(Long.toString(l), link); } + @Override public String toString() { String rv; rv = sb.append("\n</table>").toString(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java index 8ad32b1..bea693c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java @@ -29,7 +29,6 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import javax.xml.bind.annotation.XmlRootElement; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ClusterCoordinator;