http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 7299274..da2e9eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -27,7 +27,6 @@ import com.google.common.collect.Lists; public class BatchSchema implements Iterable<MaterializedField> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class); final SelectionVectorMode selectionVectorMode; - ; private final List<MaterializedField> fields; BatchSchema(SelectionVectorMode selectionVector, List<MaterializedField> fields) { @@ -39,12 +38,14 @@ public class BatchSchema implements Iterable<MaterializedField> { return new SchemaBuilder(); } - public int getFieldCount(){ + public int getFieldCount() { return fields.size(); } - public MaterializedField getColumn(int index){ - if(index < 0 || index >= fields.size()) return null; + public MaterializedField getColumn(int index) { + if (index < 0 || index >= fields.size()) { + return null; + } return fields.get(index); } @@ -95,23 +96,27 @@ public class BatchSchema implements Iterable<MaterializedField> { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } BatchSchema other = (BatchSchema) obj; if (fields == null) { - if (other.fields != null) + if (other.fields != null) { return false; - } else if (!fields.equals(other.fields)) + } + } else if (!fields.equals(other.fields)) { return false; - if (selectionVectorMode != other.selectionVectorMode) + } + if (selectionVectorMode != other.selectionVectorMode) { return false; + } return true; } - - }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index 7872e08..9dbb583 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -33,11 +33,11 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< private MaterializedField f; private final boolean releasable; - public HyperVectorWrapper(MaterializedField f, T[] v){ + public HyperVectorWrapper(MaterializedField f, T[] v) { this(f, v, true); } - public HyperVectorWrapper(MaterializedField f, T[] v, boolean releasable){ + public HyperVectorWrapper(MaterializedField f, T[] v, boolean releasable) { assert(v.length > 0); this.f = f; this.vectors = v; @@ -72,22 +72,26 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< @Override public void clear() { - if(!releasable) return; - for(T x : vectors){ + if (!releasable) { + return; + } + for (T x : vectors) { x.clear(); } } @Override public VectorWrapper<?> getChildWrapper(int[] ids) { - if(ids.length == 1) return this; + if (ids.length == 1) { + return this; + } ValueVector[] vectors = new ValueVector[this.vectors.length]; int index = 0; - for(ValueVector v : this.vectors){ + for (ValueVector v : this.vectors) { ValueVector vector = v; - for(int i = 1; i < ids.length; i++){ + for (int i = 1; i < ids.length; i++) { MapVector map = (MapVector) vector; vector = map.getVectorById(ids[i]); } @@ -100,9 +104,11 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< @Override public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { ValueVector v = vectors[0]; - if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null; + if (!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) { + return null; + } - if(v instanceof AbstractContainerVector){ + if (v instanceof AbstractContainerVector) { // we're looking for a multi path. AbstractContainerVector c = (AbstractContainerVector) v; TypedFieldId.Builder builder = TypedFieldId.newBuilder(); @@ -111,7 +117,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< builder.addId(id); return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); - }else{ + } else { return TypedFieldId.newBuilder() // .intermediateType(v.getField().getType()) // .finalType(v.getField().getType()) // @@ -126,7 +132,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< public VectorWrapper<T> cloneAndTransfer() { return new HyperVectorWrapper<T>(f, vectors, false); // T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length); -// for(int i =0; i < newVectors.length; i++){ +// for(int i =0; i < newVectors.length; i++) { // TransferPair tp = vectors[i].getTransferPair(); // tp.transfer(); // newVectors[i] = (T) tp.getTo(); @@ -134,7 +140,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< // return new HyperVectorWrapper<T>(f, newVectors); } - public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v, boolean releasable){ + public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v, boolean releasable) { return new HyperVectorWrapper<T>(f, v, releasable); } @@ -146,4 +152,5 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< public void addVectors(ValueVector[] vv) { vectors = (T[]) ArrayUtils.add(vectors, vv); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java index 54e1136..328f6ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java @@ -53,10 +53,8 @@ public class MajorTypeSerDe { return jp.readValueAs(MajorTypeHolder.class).getMajorType(); } - } - public static class Se extends StdSerializer<MajorType> { public Se() { @@ -90,27 +88,40 @@ public class MajorTypeSerDe { this.scale = scale; } - private MajorTypeHolder(){} + private MajorTypeHolder() {} @JsonIgnore - public MajorType getMajorType(){ + public MajorType getMajorType() { MajorType.Builder b = MajorType.newBuilder(); b.setMode(mode); b.setMinorType(minorType); - if(precision != null) b.setPrecision(precision); - if(width != null) b.setWidth(width); - if(scale != null) b.setScale(scale); + if (precision != null) { + b.setPrecision(precision); + } + if (width != null) { + b.setWidth(width); + } + if (scale != null) { + b.setScale(scale); + } return b.build(); } - public static MajorTypeHolder get(MajorType mt){ + public static MajorTypeHolder get(MajorType mt) { MajorTypeHolder h = new MajorTypeHolder(); h.minorType = mt.getMinorType(); h.mode = mt.getMode(); - if(mt.hasPrecision()) h.precision = mt.getPrecision(); - if(mt.hasScale()) h.scale = mt.getScale(); - if(mt.hasWidth()) h.width = mt.getWidth(); + if (mt.hasPrecision()) { + h.precision = mt.getPrecision(); + } + if (mt.hasScale()) { + h.scale = mt.getScale(); + } + if (mt.hasWidth()) { + h.width = mt.getWidth(); + } return h; } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java index 540977d..0ed74fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -64,7 +64,9 @@ public class MaterializedField { public String getLastName(){ PathSegment seg = key.path.getRootSegment(); - while(seg.getChild() != null) seg = seg.getChild(); + while (seg.getChild() != null) { + seg = seg.getChild(); + } return seg.getNameSegment().getPath(); } @@ -143,7 +145,7 @@ public class MaterializedField { public MaterializedField getOtherNullableVersion(){ MajorType mt = key.type; DataMode newDataMode = null; - switch(mt.getMode()){ + switch (mt.getMode()){ case OPTIONAL: newDataMode = DataMode.REQUIRED; break; @@ -161,7 +163,9 @@ public class MaterializedField { } public boolean matches(SchemaPath path) { - if(!path.isSimplePath()) return false; + if (!path.isSimplePath()) { + return false; + } return key.path.equals(path); } @@ -178,23 +182,30 @@ public class MaterializedField { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } MaterializedField other = (MaterializedField) obj; if (children == null) { - if (other.children != null) + if (other.children != null) { return false; - } else if (!children.equals(other.children)) + } + } else if (!children.equals(other.children)) { return false; + } if (key == null) { - if (other.key != null) + if (other.key != null) { return false; - } else if (!key.equals(other.key)) + } + } else if (!key.equals(other.key)) { return false; + } return true; } @@ -237,26 +248,33 @@ public class MaterializedField { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } Key other = (Key) obj; if (path == null) { - if (other.path != null) + if (other.path != null) { return false; - } else if (!path.equals(other.path)) + } + } else if (!path.equals(other.path)) { return false; + } if (type == null) { - if (other.type != null) + if (other.type != null) { return false; - } else if (!type.equals(other.type)) + } + } else if (!type.equals(other.type)) { return false; + } return true; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java index dd0f89a..e1725e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java @@ -38,7 +38,9 @@ public class RawFragmentBatch { this.body = body; this.connection = connection; this.sender = sender; - if(body != null) body.retain(); + if (body != null) { + body.retain(); + } } public FragmentRecordBatch getHeader() { @@ -54,8 +56,10 @@ public class RawFragmentBatch { return "RawFragmentBatch [header=" + header + ", body=" + body + "]"; } - public void release(){ - if(body != null) body.release(); + public void release() { + if (body != null) { + body.release(); + } } public RemoteConnection getConnection() { @@ -66,7 +70,7 @@ public class RawFragmentBatch { return sender; } - public void sendOk(){ + public void sendOk() { sender.send(DataRpcConfig.OK); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java index 78214db..8bf346d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java @@ -57,8 +57,8 @@ public class SchemaBuilder { return this; } - public SchemaBuilder addFields(Iterable<MaterializedField> fields){ - for(MaterializedField f : fields){ + public SchemaBuilder addFields(Iterable<MaterializedField> fields) { + for (MaterializedField f : fields) { addField(f); } return this; @@ -86,7 +86,7 @@ public class SchemaBuilder { // fields.put(f.getFieldId(), f); // } - public SchemaBuilder addField(MaterializedField f){ + public SchemaBuilder addField(MaterializedField f) { fields.add(f); return this; } @@ -104,12 +104,14 @@ public class SchemaBuilder { // if (!fields.containsKey(fieldId)) // throw new SchemaChangeException( // String.format("An attempt was made to replace a field in the schema, however the schema does " + -// "not currently contain that field id. The offending fieldId was %d", fieldId)); +// "not currently contain that field id. The offending fieldId was %d", fieldId)); // setTypedField(fieldId, type, nullable, mode, valueClass); // } - public SchemaBuilder removeField(MaterializedField f) throws SchemaChangeException{ - if(!fields.remove(f)) throw new SchemaChangeException("You attempted to remove an nonexistent field."); + public SchemaBuilder removeField(MaterializedField f) throws SchemaChangeException { + if (!fields.remove(f)) { + throw new SchemaChangeException("You attempted to remove an nonexistent field."); + } return this; } @@ -118,8 +120,9 @@ public class SchemaBuilder { * @return * @throws SchemaChangeException */ - public BatchSchema build(){ + public BatchSchema build() { List<MaterializedField> fieldList = Lists.newArrayList(fields); return new BatchSchema(this.selectionVectorMode, fieldList); } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index 8a2312a..5bd3e41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -29,7 +29,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper private T v; - public SimpleVectorWrapper(T v){ + public SimpleVectorWrapper(T v) { this.v = v; } @@ -72,18 +72,19 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper v.clear(); } - public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){ + public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v) { return new SimpleVectorWrapper<T>(v); } @Override public VectorWrapper<?> getChildWrapper(int[] ids) { - if(ids.length == 1) return this; + if (ids.length == 1) { + return this; + } ValueVector vector = v; - - for(int i = 1; i < ids.length; i++){ + for (int i = 1; i < ids.length; i++) { MapVector map = (MapVector) vector; vector = map.getVectorById(ids[i]); } @@ -93,10 +94,12 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper @Override public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { - if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null; + if (!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) { + return null; + } PathSegment seg = expectedPath.getRootSegment(); - if(v instanceof AbstractContainerVector){ + if (v instanceof AbstractContainerVector) { // we're looking for a multi path. AbstractContainerVector c = (AbstractContainerVector) v; TypedFieldId.Builder builder = TypedFieldId.newBuilder(); @@ -104,28 +107,26 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper builder.addId(id); return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); - }else{ + } else { TypedFieldId.Builder builder = TypedFieldId.newBuilder(); builder.intermediateType(v.getField().getType()); builder.addId(id); builder.finalType(v.getField().getType()); - if(seg.isLastPath()){ + if (seg.isLastPath()) { return builder.build(); - }else{ + } else { PathSegment child = seg.getChild(); - if(child.isArray() && child.isLastPath()){ + if (child.isArray() && child.isLastPath()) { builder.remainder(child); builder.withIndex(); builder.finalType(v.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build()); return builder.build(); - }else{ + } else { return null; } } - } } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java index acb56d6..f7cfefa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java @@ -36,15 +36,15 @@ public class TypedFieldId { final boolean isHyperReader; final PathSegment remainder; - public TypedFieldId(MajorType type, int... fieldIds){ + public TypedFieldId(MajorType type, int... fieldIds) { this(type, type, type, false, null, fieldIds); } - public TypedFieldId(MajorType type, IntArrayList breadCrumb, PathSegment remainder){ + public TypedFieldId(MajorType type, IntArrayList breadCrumb, PathSegment remainder) { this(type, type, type, false, remainder, breadCrumb.toArray()); } - public TypedFieldId(MajorType type, boolean isHyper, int... fieldIds){ + public TypedFieldId(MajorType type, boolean isHyper, int... fieldIds) { this(type, type, type, isHyper, null, fieldIds); } @@ -58,35 +58,35 @@ public class TypedFieldId { this.remainder = remainder; } - - - public TypedFieldId cloneWithChild(int id){ + public TypedFieldId cloneWithChild(int id) { int[] fieldIds = ArrayUtils.add(this.fieldIds, id); return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds); } - public PathSegment getLastSegment(){ - if(remainder == null) return null; + public PathSegment getLastSegment() { + if (remainder == null) { + return null; + } PathSegment seg = remainder; - while(seg.getChild() != null){ + while (seg.getChild() != null) { seg = seg.getChild(); } return seg; } - public TypedFieldId cloneWithRemainder(PathSegment remainder){ + public TypedFieldId cloneWithRemainder(PathSegment remainder) { return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds); } - public boolean hasRemainder(){ + public boolean hasRemainder() { return remainder != null; } - public PathSegment getRemainder(){ + public PathSegment getRemainder() { return remainder; } - public boolean isHyperReader(){ + public boolean isHyperReader() { return isHyperReader; } @@ -94,11 +94,11 @@ public class TypedFieldId { return intermediateType; } - public Class<? extends ValueVector> getIntermediateClass(){ + public Class<? extends ValueVector> getIntermediateClass() { return (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(intermediateType.getMinorType(), intermediateType.getMode()); } - public MajorType getFinalType(){ + public MajorType getFinalType() { return finalType; } @@ -106,13 +106,11 @@ public class TypedFieldId { return fieldIds; } - - public MajorType getSecondaryFinal() { return secondaryFinal; } - public static Builder newBuilder(){ + public static Builder newBuilder() { return new Builder(); } @@ -125,27 +123,27 @@ public class TypedFieldId { boolean hyperReader = false; boolean withIndex = false; - public Builder addId(int id){ + public Builder addId(int id) { ids.add(id); return this; } - public Builder withIndex(){ + public Builder withIndex() { withIndex = true; return this; } - public Builder remainder(PathSegment remainder){ + public Builder remainder(PathSegment remainder) { this.remainder = remainder; return this; } - public Builder hyper(){ + public Builder hyper() { this.hyperReader = true; return this; } - public Builder finalType(MajorType finalType){ + public Builder finalType(MajorType finalType) { this.finalType = finalType; return this; } @@ -155,17 +153,21 @@ public class TypedFieldId { return this; } - public Builder intermediateType(MajorType intermediateType){ + public Builder intermediateType(MajorType intermediateType) { this.intermediateType = intermediateType; return this; } - public TypedFieldId build(){ + public TypedFieldId build() { Preconditions.checkNotNull(intermediateType); Preconditions.checkNotNull(finalType); - if(intermediateType == null) intermediateType = finalType; - if (secondaryFinal == null) secondaryFinal = finalType; + if (intermediateType == null) { + intermediateType = finalType; + } + if (secondaryFinal == null) { + secondaryFinal = finalType; + } MajorType actualFinalType = finalType; //MajorType secondaryFinal = finalType; @@ -196,37 +198,50 @@ public class TypedFieldId { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } TypedFieldId other = (TypedFieldId) obj; - if (!Arrays.equals(fieldIds, other.fieldIds)) + if (!Arrays.equals(fieldIds, other.fieldIds)) { return false; + } if (finalType == null) { - if (other.finalType != null) + if (other.finalType != null) { return false; - } else if (!finalType.equals(other.finalType)) + } + } else if (!finalType.equals(other.finalType)) { return false; + } if (intermediateType == null) { - if (other.intermediateType != null) + if (other.intermediateType != null) { return false; - } else if (!intermediateType.equals(other.intermediateType)) + } + } else if (!intermediateType.equals(other.intermediateType)) { return false; - if (isHyperReader != other.isHyperReader) + } + if (isHyperReader != other.isHyperReader) { return false; + } if (remainder == null) { - if (other.remainder != null) + if (other.remainder != null) { return false; - } else if (!remainder.equals(other.remainder)) + } + } else if (!remainder.equals(other.remainder)) { return false; + } if (secondaryFinal == null) { - if (other.secondaryFinal != null) + if (other.secondaryFinal != null) { return false; - } else if (!secondaryFinal.equals(other.secondaryFinal)) + } + } else if (!secondaryFinal.equals(other.secondaryFinal)) { return false; + } return true; } @@ -238,6 +253,4 @@ public class TypedFieldId { + ", remainder=" + remainder + "]"; } - - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index ef09f39..e2f4a95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -74,15 +74,15 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto add(vv, releasable); } - public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){ + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { MaterializedField field = MaterializedField.create(name, type); ValueVector v = TypeHelper.getNewVector(field, this.oContext.getAllocator()); add(v); - if(clazz.isAssignableFrom(v.getClass())){ + if (clazz.isAssignableFrom(v.getClass())) { return (T) v; - }else{ + } else { throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName())); } } @@ -107,9 +107,7 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto public static VectorContainer canonicalize(VectorContainer original) { VectorContainer vc = new VectorContainer(); - List<VectorWrapper<?>> canonicalWrappers = new ArrayList<VectorWrapper<?>>(original.wrappers); - // Sort list of VectorWrapper alphabetically based on SchemaPath. Collections.sort(canonicalWrappers, new Comparator<VectorWrapper<?>>() { public int compare(VectorWrapper<?> v1, VectorWrapper<?> v2) { @@ -123,7 +121,6 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return vc; } - private void cloneAndTransfer(VectorWrapper<?> wrapper) { wrappers.add(wrapper.cloneAndTransfer()); } @@ -145,6 +142,7 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto public void add(ValueVector[] hyperVector) { add(hyperVector, true); } + public void add(ValueVector[] hyperVector, boolean releasable) { assert hyperVector.length != 0; schema = null; @@ -167,7 +165,6 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return; } } - throw new IllegalStateException("You attempted to remove a vector that didn't exist."); } @@ -175,7 +172,7 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto for (int i = 0; i < wrappers.size(); i++) { VectorWrapper<?> va = wrappers.get(i); TypedFieldId id = va.getFieldIdIfMatches(i, path); - if(id != null){ + if (id != null) { return id; } } @@ -183,15 +180,14 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return null; } - - - @Override public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) { Preconditions.checkArgument(fieldIds.length >= 1); VectorWrapper<?> va = wrappers.get(fieldIds[0]); - if(va == null) return null; + if (va == null) { + return null; + } if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) { throw new IllegalStateException(String.format( @@ -242,7 +238,7 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return recordCount; } - public void zeroVectors(){ + public void zeroVectors() { for (VectorWrapper<?> w : wrappers) { w.clear(); } @@ -252,17 +248,18 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return this.wrappers.size(); } - public void allocateNew(){ + public void allocateNew() { for (VectorWrapper<?> w : wrappers) { w.getValueVector().allocateNew(); } } - public boolean allocateNewSafe(){ + public boolean allocateNewSafe() { for (VectorWrapper<?> w : wrappers) { - if(!w.getValueVector().allocateNewSafe()) return false; + if (!w.getValueVector().allocateNewSafe()) { + return false; + } } - return true; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index b9690a6..308a8bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -63,7 +63,7 @@ public class WritableBatch { "Attempted to reconstruct a container from a WritableBatch after it had been cleared"); if (buffers.length > 0) { /* If we have DrillBuf's associated with value vectors */ int len = 0; - for(DrillBuf b : buffers){ + for (DrillBuf b : buffers) { len += b.capacity(); } @@ -114,7 +114,9 @@ public class WritableBatch { } public void clear() { - if(cleared) return; + if(cleared) { + return; + } for (DrillBuf buf : buffers) { buf.release(); } @@ -157,8 +159,9 @@ public class WritableBatch { } public static WritableBatch get(RecordBatch batch) { - if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) + if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) { throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable."); + } boolean sv2 = (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2); @@ -175,4 +178,5 @@ public class WritableBatch { buf.retain(increment); } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java index fd0932c..69bc78f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java @@ -31,18 +31,20 @@ public class SelectionVector4 { private int length; public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException { - if(recordCount > Integer.MAX_VALUE /4) throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. You requested an allocation of %d bytes.", recordCount * 4)); + if (recordCount > Integer.MAX_VALUE /4) { + throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. You requested an allocation of %d bytes.", recordCount * 4)); + } this.recordCount = recordCount; this.start = 0; this.length = Math.min(batchRecordCount, recordCount); this.data = vector; } - public int getTotalCount(){ + public int getTotalCount() { return recordCount; } - public int getCount(){ + public int getCount() { return length; } @@ -51,14 +53,15 @@ public class SelectionVector4 { this.recordCount = length; } - public void set(int index, int compound){ + public void set(int index, int compound) { data.setInt(index*4, compound); } - public void set(int index, int recordBatch, int recordIndex){ + + public void set(int index, int recordBatch, int recordIndex) { data.setInt(index*4, (recordBatch << 16) | (recordIndex & 65535)); } - public int get(int index){ + public int get(int index) { return data.getInt( (start+index)*4); } @@ -67,7 +70,7 @@ public class SelectionVector4 { * @return Newly created single batch SelectionVector4. * @throws SchemaChangeException */ - public SelectionVector4 createNewWrapperCurrent(){ + public SelectionVector4 createNewWrapperCurrent() { try { data.retain(); SelectionVector4 sv4 = new SelectionVector4(data, recordCount, length); @@ -78,10 +81,10 @@ public class SelectionVector4 { } } - public boolean next(){ + public boolean next() { // logger.debug("Next called. Start: {}, Length: {}, recordCount: " + recordCount, start, length); - if(start + length >= recordCount){ + if (start + length >= recordCount) { start = recordCount; length = 0; @@ -96,7 +99,7 @@ public class SelectionVector4 { return true; } - public void clear(){ + public void clear() { start = 0; length = 0; if (data != DeadBuf.DEAD_BUFFER) { @@ -105,5 +108,4 @@ public class SelectionVector4 { } } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java index 8de9948..c0f8eba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java @@ -30,15 +30,15 @@ public class SelectionVector4Builder { private List<BatchSchema> schemas = Lists.newArrayList(); - public void add(RecordBatch batch, boolean newSchema) throws SchemaChangeException{ - if(!schemas.isEmpty() && newSchema) throw new SchemaChangeException("Currently, the sv4 builder doesn't support embedded types"); - if(newSchema){ + public void add(RecordBatch batch, boolean newSchema) throws SchemaChangeException { + if (!schemas.isEmpty() && newSchema) { + throw new SchemaChangeException("Currently, the sv4 builder doesn't support embedded types"); + } + if (newSchema) { schemas.add(batch.getSchema()); } - } - // deals with managing selection vectors. // take a four byte int /** @@ -48,7 +48,7 @@ public class SelectionVector4Builder { * we should manage an array of valuevectors */ - private class VectorSchemaBuilder{ - + private class VectorSchemaBuilder { } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/DefaultFunctionResolver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/DefaultFunctionResolver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/DefaultFunctionResolver.java index 9f02d74..4f128b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/DefaultFunctionResolver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/DefaultFunctionResolver.java @@ -37,7 +37,7 @@ public class DefaultFunctionResolver implements FunctionResolver { currcost = TypeCastRules.getCost(call, h); // if cost is lower than 0, func implementation is not matched, either w/ or w/o implicit casts - if (currcost < 0 ){ + if (currcost < 0 ) { continue; } @@ -51,8 +51,9 @@ public class DefaultFunctionResolver implements FunctionResolver { //did not find a matched func implementation, either w/ or w/o implicit casts //TODO: raise exception here? return null; - } else + } else { return bestmatch; + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolver.java index 2bd80a5..14d46c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolver.java @@ -25,6 +25,6 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder; public interface FunctionResolver { - public DrillFuncHolder getBestMatch(List<DrillFuncHolder> methods, FunctionCall call); + public DrillFuncHolder getBestMatch(List<DrillFuncHolder> methods, FunctionCall call); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolverFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolverFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolverFactory.java index fa5a3ce..0205aef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolverFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/FunctionResolverFactory.java @@ -22,8 +22,8 @@ import org.apache.drill.common.expression.FunctionCall; public class FunctionResolverFactory { - public static FunctionResolver getResolver(FunctionCall call){ - return new DefaultFunctionResolver(); - } + public static FunctionResolver getResolver(FunctionCall call) { + return new DefaultFunctionResolver(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java index 838c49c..ea3155d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java @@ -44,34 +44,34 @@ public class ResolverTypePrecedence { */ int i = 0; precedenceMap = new HashMap<MinorType, Integer>(); - precedenceMap.put(MinorType.NULL, i += 2); // NULL is legal to implicitly be promoted to any other type - precedenceMap.put(MinorType.FIXEDBINARY, i += 2); // Fixed-length is promoted to var length - precedenceMap.put(MinorType.VARBINARY, i += 2); + precedenceMap.put(MinorType.NULL, i += 2); // NULL is legal to implicitly be promoted to any other type + precedenceMap.put(MinorType.FIXEDBINARY, i += 2); // Fixed-length is promoted to var length + precedenceMap.put(MinorType.VARBINARY, i += 2); precedenceMap.put(MinorType.FIXEDCHAR, i += 2); - precedenceMap.put(MinorType.VARCHAR, i += 2); + precedenceMap.put(MinorType.VARCHAR, i += 2); precedenceMap.put(MinorType.FIXED16CHAR, i += 2); - precedenceMap.put(MinorType.VAR16CHAR, i += 2); - precedenceMap.put(MinorType.BIT, i += 2); - precedenceMap.put(MinorType.TINYINT, i += 2); //type with few bytes is promoted to type with more bytes ==> no data loss. - precedenceMap.put(MinorType.UINT1, i += 2); //signed is legal to implicitly be promoted to unsigned. - precedenceMap.put(MinorType.SMALLINT, i += 2); - precedenceMap.put(MinorType.UINT2, i += 2); - precedenceMap.put(MinorType.INT, i += 2); - precedenceMap.put(MinorType.UINT4, i += 2); - precedenceMap.put(MinorType.BIGINT, i += 2); - precedenceMap.put(MinorType.UINT8, i += 2); - precedenceMap.put(MinorType.MONEY, i += 2); - precedenceMap.put(MinorType.FLOAT4, i += 2); + precedenceMap.put(MinorType.VAR16CHAR, i += 2); + precedenceMap.put(MinorType.BIT, i += 2); + precedenceMap.put(MinorType.TINYINT, i += 2); //type with few bytes is promoted to type with more bytes ==> no data loss. + precedenceMap.put(MinorType.UINT1, i += 2); //signed is legal to implicitly be promoted to unsigned. + precedenceMap.put(MinorType.SMALLINT, i += 2); + precedenceMap.put(MinorType.UINT2, i += 2); + precedenceMap.put(MinorType.INT, i += 2); + precedenceMap.put(MinorType.UINT4, i += 2); + precedenceMap.put(MinorType.BIGINT, i += 2); + precedenceMap.put(MinorType.UINT8, i += 2); + precedenceMap.put(MinorType.MONEY, i += 2); + precedenceMap.put(MinorType.FLOAT4, i += 2); precedenceMap.put(MinorType.DECIMAL9, i += 2); precedenceMap.put(MinorType.DECIMAL18, i += 2); precedenceMap.put(MinorType.DECIMAL28DENSE, i += 2); precedenceMap.put(MinorType.DECIMAL28SPARSE, i += 2); precedenceMap.put(MinorType.DECIMAL38DENSE, i += 2); precedenceMap.put(MinorType.DECIMAL38SPARSE, i += 2); - precedenceMap.put(MinorType.FLOAT8, i += 2); - precedenceMap.put(MinorType.TIME, i += 2); - precedenceMap.put(MinorType.DATE, i += 2); - precedenceMap.put(MinorType.TIMESTAMP, i += 2); + precedenceMap.put(MinorType.FLOAT8, i += 2); + precedenceMap.put(MinorType.TIME, i += 2); + precedenceMap.put(MinorType.DATE, i += 2); + precedenceMap.put(MinorType.TIMESTAMP, i += 2); precedenceMap.put(MinorType.TIMETZ, i += 2); precedenceMap.put(MinorType.TIMESTAMPTZ, i += 2); precedenceMap.put(MinorType.INTERVALDAY, i+= 2); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java index d1ed95e..7969d49 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java @@ -770,7 +770,9 @@ public class TypeCastRules { } public static boolean isCastableWithNullHandling(MajorType from, MajorType to, NullHandling nullHandling) { - if (nullHandling == NullHandling.INTERNAL && from.getMode() != to.getMode()) return false; + if (nullHandling == NullHandling.INTERNAL && from.getMode() != to.getMode()) { + return false; + } return isCastable(from.getMinorType(), to.getMinorType()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java index 78e4c10..9048241 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java @@ -42,17 +42,22 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends Me @Override protected void decode(ChannelHandlerContext ctx, InboundRpcMessage inbound, List<Object> outputs) throws Exception { - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received handshake {}", inbound); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Received handshake {}", inbound); + } this.coordinationId = inbound.coordinationId; ctx.channel().pipeline().remove(this); - if (inbound.rpcType != handshakeType.getNumber()) + if (inbound.rpcType != handshakeType.getNumber()) { throw new RpcException(String.format("Handshake failure. Expected %s[%d] but received number [%d]", handshakeType, handshakeType.getNumber(), inbound.rpcType)); + } T msg = parser.parseFrom(inbound.getProtobufBodyAsIS()); consumeHandshake(ctx, msg); inbound.pBody.release(); - if(inbound.dBody != null) inbound.dBody.release(); + if (inbound.dBody != null) { + inbound.dBody.release(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java index f7b3969..8f43b06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java @@ -39,11 +39,11 @@ public class CoordinationQueue { } void channelClosed(Throwable ex) { - if(ex != null){ + if (ex != null) { RpcException e; - if(ex instanceof RpcException){ + if (ex instanceof RpcException) { e = (RpcException) ex; - }else{ + } else { e = new RpcException(ex); } for (RpcOutcome<?> f : map.values()) { @@ -52,13 +52,14 @@ public class CoordinationQueue { } } - public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection){ + public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection) { int i = circularInt.getNext(); RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection); Object old = map.put(i, future); - if (old != null) + if (old != null) { throw new IllegalStateException( "You attempted to reuse a coordination id when the previous coordination id has not been removed. This is likely rpc future callback memory leak."); + } return future; } @@ -79,11 +80,11 @@ public class CoordinationQueue { @Override public void operationComplete(ChannelFuture future) throws Exception { - if(!future.isSuccess()){ + if (!future.isSuccess()) { removeFromMap(coordinationId); - if(future.channel().isActive()) { + if (future.channel().isActive()) { throw new RpcException("Future failed") ; - }else{ + } else { throw new ChannelClosedException(); } } @@ -111,7 +112,6 @@ public class CoordinationQueue { return coordinationId; } - } private RpcOutcome<?> removeFromMap(int coordinationId) { @@ -130,7 +130,6 @@ public class CoordinationQueue { Class<?> outcomeClass = rpc.getOutcomeType(); if (outcomeClass != clazz) { - throw new IllegalStateException( String .format( @@ -149,11 +148,12 @@ public class CoordinationQueue { public void updateFailedFuture(int coordinationId, RpcFailure failure) { // logger.debug("Updating failed future."); - try{ + try { RpcOutcome<?> rpc = removeFromMap(coordinationId); rpc.setException(new RemoteRpcException(failure)); - }catch(Exception ex){ + } catch(Exception ex) { logger.warn("Failed to remove from map. Not a problem since we were updating on failed future.", ex); } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java index 291c71a..19d9c30 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java @@ -76,10 +76,10 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple return buffer; } - public void release(){ - if(buffer != null) buffer.release(); + public void release() { + if (buffer != null) { + buffer.release(); + } } - - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java index 012b9e4..d739034 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java @@ -35,16 +35,22 @@ public class InboundRpcMessage extends RpcMessage{ } @Override - public int getBodySize(){ + public int getBodySize() { int len = pBody.capacity(); - if(dBody != null) len += dBody.capacity(); + if (dBody != null) { + len += dBody.capacity(); + } return len; } @Override - void release(){ - if (pBody != null) pBody.release(); - if(dBody != null) dBody.release(); + void release() { + if (pBody != null) { + pBody.release(); + } + if (dBody != null) { + dBody.release(); + } } @Override @@ -53,7 +59,8 @@ public class InboundRpcMessage extends RpcMessage{ + coordinationId + ", dBody=" + dBody + "]"; } - public InputStream getProtobufBodyAsIS(){ + public InputStream getProtobufBodyAsIS() { return new ByteBufInputStream(pBody); } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java index ef966cb..edad63e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java @@ -40,10 +40,10 @@ public class OutboundRpcMessage extends RpcMessage { // Netty doesn't traditionally release the reference on an unreadable buffer. However, we need to so that if we send a empty or unwritable buffer, we still release. otherwise we get weird memory leaks when sending empty vectors. List<ByteBuf> bufs = Lists.newArrayList(); - for(ByteBuf d : dBodies){ - if(d.readableBytes() == 0){ + for (ByteBuf d : dBodies) { + if (d.readableBytes() == 0) { d.release(); - }else{ + } else { bufs.add(d); } } @@ -58,12 +58,16 @@ public class OutboundRpcMessage extends RpcMessage { return len; } - public int getRawBodySize(){ - if(dBodies == null) return 0; + public int getRawBodySize() { + if (dBodies == null) { + return 0; + } int len = 0; for (int i = 0; i < dBodies.length; i++) { - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reader Index {}, Writer Index {}", dBodies[i].readerIndex(), dBodies[i].writerIndex()); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Reader Index {}, Writer Index {}", dBodies[i].readerIndex(), dBodies[i].writerIndex()); + } len += dBodies[i].readableBytes(); } return len; @@ -76,13 +80,12 @@ public class OutboundRpcMessage extends RpcMessage { } @Override - void release(){ - if(dBodies != null){ - for(ByteBuf b : dBodies){ + void release() { + if (dBodies != null) { + for (ByteBuf b : dBodies) { b.release(); } } } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java index 02fb75e..4f075d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java @@ -47,8 +47,9 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (!ctx.channel().isOpen()) { - if (in.readableBytes() > 0) + if (in.readableBytes() > 0) { logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes()); + } in.skipBytes(in.readableBytes()); return; } @@ -80,7 +81,7 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { // need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward... // TODO: Can we avoid this copy? ByteBuf outBuf = allocator.buffer(length); - if(outBuf == null){ + if (outBuf == null) { logger.warn("Failure allocating buffer on incoming stream due to memory limits. Current Allocation: {}.", allocator.getAllocatedMemory()); in.resetReaderIndex(); outOfMemoryHandler.handle(); @@ -90,10 +91,11 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { in.skipBytes(length); - if (RpcConstants.EXTRA_DEBUGGING) + if (RpcConstants.EXTRA_DEBUGGING) { logger.debug(String.format( "ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.", in.readerIndex(), i + 1, length)); + } out.add(outBuf); return; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java index 1675b52..f214c4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java @@ -136,7 +136,9 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne break; } connection = connectionHolder.get(); - if (connection != null) break; + if (connection != null) { + break; + } } if (connection != incoming) { @@ -218,8 +220,9 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne break; } connection = connectionHolder.get(); - if (connection != null) + if (connection != null) { break; + } } if (connection == incoming) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java index 615bccc..a2a6d2a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java @@ -49,10 +49,11 @@ public class ResettableBarrier { while(true) { int c = getState(); - if (c == 0) + if (c == 0) { return false; + } int nextc = c - 1; - if (compareAndSetState(c, nextc)){ + if (compareAndSetState(c, nextc)) { return nextc == 0; } } @@ -79,7 +80,7 @@ public class ResettableBarrier { sync.releaseShared(1); } - public void closeBarrier(){ + public void closeBarrier() { // logger.debug("closing barrier."); sync.reset(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index c6979e5..918ca0b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -81,10 +81,14 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) { - if(!allowInEventLoop){ - if(connection.inEventLoop()) throw new IllegalStateException("You attempted to send while inside the rpc event thread. This isn't allowed because sending will block if the channel is backed up."); + if (!allowInEventLoop) { + if (connection.inEventLoop()) { + throw new IllegalStateException("You attempted to send while inside the rpc event thread. This isn't allowed because sending will block if the channel is backed up."); + } - if(!connection.blockOnNotWritable(listener)) return; + if (!connection.blockOnNotWritable(listener)) { + return; + } } ByteBuf pBuffer = null; @@ -102,11 +106,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp channelFuture.addListener(futureListener); channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); completed = true; - } catch(Exception | AssertionError e){ + } catch (Exception | AssertionError e) { listener.failed(new RpcException("Failure sending message.", e)); } finally { if (!completed) { - if (pBuffer != null) pBuffer.release(); + if (pBuffer != null) { + pBuffer.release(); + } if (dataBodies != null) { for (ByteBuf b : dataBodies) { b.release(); @@ -130,7 +136,9 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp } protected void closeQueueDueToChannelClose() { - if (this.isClient()) queue.channelClosed(new ChannelClosedException("Queue closed due to channel closure.")); + if (this.isClient()) { + queue.channelClosed(new ChannelClosedException("Queue closed due to channel closure.")); + } } protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) { @@ -148,11 +156,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp this.coordinationId = coordinationId; } - public void send(Response r){ + public void send(Response r) { assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass()); OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId, r.pBody, r.dBodies); - if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Adding message to outbound buffer. {}", outMessage); + } connection.getChannel().writeAndFlush(outMessage); } @@ -168,8 +178,12 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp @Override protected void decode(ChannelHandlerContext ctx, InboundRpcMessage msg, List<Object> output) throws Exception { - if (!ctx.channel().isOpen()) return; - if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg); + if (!ctx.channel().isOpen()) { + return; + } + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Received message {}", msg); + } switch (msg.mode) { case REQUEST: { // handle message and ack. @@ -188,8 +202,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes())); rpcFuture.set(value, msg.dBody); msg.release(); // we release our ownership. Handle could have taken over ownership. - if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value); - }catch(Exception ex){ + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Updated rpc future {} with value {}", rpcFuture, value); + } + }catch(Exception ex) { logger.error("Failure while handling response.", ex); throw ex; } @@ -199,8 +215,9 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp RpcFailure failure = RpcFailure.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes())); queue.updateFailedFuture(msg.coordinationId, failure); msg.release(); - if (RpcConstants.EXTRA_DEBUGGING) + if (RpcConstants.EXTRA_DEBUGGING) { logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure); + } break; default: @@ -252,4 +269,5 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp throw new RpcException(String.format("Failure while decoding message with parser of type. %s", parser.getClass().getCanonicalName()), e); } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java index 3010f2b..b5974f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java @@ -31,7 +31,7 @@ public class RpcConfig { private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap; private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap; - private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap){ + private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap) { this.name = name; this.sendMap = ImmutableMap.copyOf(sendMap); this.receiveMap = ImmutableMap.copyOf(receiveMap); @@ -41,33 +41,51 @@ public class RpcConfig { return name; } - public boolean checkReceive(int rpcType, Class<?> receiveClass){ - if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass)); + public boolean checkReceive(int rpcType, Class<?> receiveClass) { + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass)); + } RpcMessageType<?,?,?> type = receiveMap.get(rpcType); - if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc receive type number of %s.", name, rpcType)); + if (type == null) { + throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc receive type number of %s.", name, rpcType)); + } - if(receiveClass != type.getRet()){ + if (receiveClass != type.getRet()) { throw new IllegalStateException(String.format("%s: The definition for receive doesn't match implementation code. The definition is %s however the current receive for this type was of type %s.", name, type, receiveClass.getCanonicalName())); } return true; } - public boolean checkSend(EnumLite send, Class<?> sendClass, Class<?> receiveClass){ - if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking send classes for send RpcType %s. Send Class is %s and Receive class is %s.", send, sendClass, receiveClass)); + public boolean checkSend(EnumLite send, Class<?> sendClass, Class<?> receiveClass) { + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug(String.format("Checking send classes for send RpcType %s. Send Class is %s and Receive class is %s.", send, sendClass, receiveClass)); + } RpcMessageType<?,?,?> type = sendMap.get(send); - if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc send type of %s.", name, send)); + if (type == null) { + throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc send type of %s.", name, send)); + } - if(type.getSend() != sendClass) throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code. The definition is %s however the current send is trying to send an object of type %s.", name, type, sendClass.getCanonicalName())); - if(type.getRet() != receiveClass) throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code. The definition is %s however the current send is trying to setup an expected reception of an object of type %s.", name, type, receiveClass.getCanonicalName())); + if (type.getSend() != sendClass) { + throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code. The definition is %s however the current send is trying to send an object of type %s.", name, type, sendClass.getCanonicalName())); + } + if (type.getRet() != receiveClass) { + throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code. The definition is %s however the current send is trying to setup an expected reception of an object of type %s.", name, type, receiveClass.getCanonicalName())); + } return true; } - public boolean checkResponseSend(EnumLite responseType, Class<?> responseClass){ - if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking responce send of type %s with response class of %s.", responseType, responseClass)); + public boolean checkResponseSend(EnumLite responseType, Class<?> responseClass) { + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug(String.format("Checking responce send of type %s with response class of %s.", responseType, responseClass)); + } RpcMessageType<?,?,?> type = receiveMap.get(responseType.getNumber()); - if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc response of type %s.", name, responseType)); - if(type.getRet() != responseClass) throw new IllegalStateException(String.format("%s: The definition for the response doesn't match implementation code. The definition is %s however the current response is trying to response with an object of type %s.", name, type, responseClass.getCanonicalName())); + if (type == null) { + throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc response of type %s.", name, responseType)); + } + if (type.getRet() != responseClass) { + throw new IllegalStateException(String.format("%s: The definition for the response doesn't match implementation code. The definition is %s however the current response is trying to response with an object of type %s.", name, type, responseClass.getCanonicalName())); + } return true; } @@ -114,10 +132,9 @@ public class RpcConfig { + ret + "]"; } - } - public static RpcConfigBuilder newBuilder(String name){ + public static RpcConfigBuilder newBuilder(String name) { return new RpcConfigBuilder(name); } @@ -126,25 +143,21 @@ public class RpcConfig { private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap(); private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap(); - private RpcConfigBuilder(String name){ + private RpcConfigBuilder(String name) { this.name = name; } - public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite> RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec){ + public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite> RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec) { RpcMessageType<SEND, RECEIVE, T> type = new RpcMessageType<SEND, RECEIVE, T>(sendEnum, send, receiveEnum, rec); this.sendMap.put(sendEnum, type); this.receiveMap.put(receiveEnum.getNumber(), type); return this; } - public RpcConfig build(){ + public RpcConfig build() { return new RpcConfig(name, sendMap, receiveMap); - } - } - + } } - - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java index f4fe64d..74a4afb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java @@ -37,18 +37,20 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { private final AtomicLong messageCounter = new AtomicLong(); - public RpcDecoder(String name){ + public RpcDecoder(String name) { this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "-" + name); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { - if(!ctx.channel().isOpen()){ + if (!ctx.channel().isOpen()) { return; } - if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Inbound rpc message received."); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Inbound rpc message received."); + } // now, we know the entire message is in the buffer and the buffer is constrained to this message. Additionally, // this process should avoid reading beyond the end of this buffer so we inform the ByteBufInputStream to throw an @@ -59,7 +61,9 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { checkTag(is, RpcEncoder.HEADER_TAG); final RpcHeader header = RpcHeader.parseDelimitedFrom(is); - if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex()); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug(" post header read index {}", buffer.readerIndex()); + } // read the protobuf body into a buffer. checkTag(is, RpcEncoder.PROTOBUF_BODY_TAG); @@ -67,9 +71,13 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { final ByteBuf pBody = buffer.slice(buffer.readerIndex(), pBodyLength); buffer.skipBytes(pBodyLength); pBody.retain(); - if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody); + } - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("post protobufbody read index {}", buffer.readerIndex()); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("post protobufbody read index {}", buffer.readerIndex()); + } ByteBuf dBody = null; int dBodyLength = 0; @@ -77,16 +85,24 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { // read the data body. if (buffer.readableBytes() > 0) { - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reading raw body, buffer has {} bytes available, is available {}.", buffer.readableBytes(), is.available()); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Reading raw body, buffer has {} bytes available, is available {}.", buffer.readableBytes(), is.available()); + } checkTag(is, RpcEncoder.RAW_BODY_TAG); dBodyLength = readRawVarint32(is); - if(buffer.readableBytes() != dBodyLength) throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, buffer.readableBytes())); + if (buffer.readableBytes() != dBodyLength) { + throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, buffer.readableBytes())); + } dBody = buffer.slice(); dBody.retain(); - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Read raw body of {}", dBody); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Read raw body of {}", dBody); + } }else{ - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("No need to read raw body, no readable bytes left."); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("No need to read raw body, no readable bytes left."); + } } @@ -97,14 +113,16 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { // move the reader index forward so the next rpc call won't try to work with it. buffer.skipBytes(dBodyLength); messageCounter.incrementAndGet(); - if (RpcConstants.SOME_DEBUGGING) logger.debug("Inbound Rpc Message Decoded {}.", m); + if (RpcConstants.SOME_DEBUGGING) { + logger.debug("Inbound Rpc Message Decoded {}.", m); + } out.add(m); } private void checkTag(ByteBufInputStream is, int expectedTag) throws IOException { int actualTag = readRawVarint32(is); - if (actualTag != expectedTag){ + if (actualTag != expectedTag) { throw new CorruptedFrameException(String.format("Expected to read a tag of %d but actually received a value of %d. Happened after reading %d message.", expectedTag, actualTag, messageCounter.get())); } } @@ -143,4 +161,5 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { } return result; } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java index 8bf3483..34256f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java @@ -45,22 +45,26 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ static final int PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG); static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG); - public RpcEncoder(String name){ + public RpcEncoder(String name) { this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "-" + name); } @Override protected void encode(ChannelHandlerContext ctx, OutboundRpcMessage msg, List<Object> out) throws Exception { - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Rpc Encoder called with msg {}", msg); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Rpc Encoder called with msg {}", msg); + } - if(!ctx.channel().isOpen()){ + if (!ctx.channel().isOpen()) { //output.add(ctx.alloc().buffer(0)); logger.debug("Channel closed, skipping encode."); return; } try{ - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Encoding outbound message {}", msg); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Encoding outbound message {}", msg); + } // first we build the RpcHeader RpcHeader header = RpcHeader.newBuilder() // .setMode(msg.mode) // @@ -75,7 +79,7 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ HEADER_TAG_LENGTH + getRawVarintSize(headerLength) + headerLength + // PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(protoBodyLength) + protoBodyLength; // - if(rawBodyLength > 0){ + if (rawBodyLength > 0) { fullLength += (RAW_BODY_TAG_LENGTH + getRawVarintSize(rawBodyLength) + rawBodyLength); } @@ -97,8 +101,10 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ msg.pBody.writeTo(cos); // if exists, write data body and tag. - if(msg.getRawBodySize() > 0){ - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Writing raw body of size {}", msg.getRawBodySize()); + if (msg.getRawBodySize() > 0) { + if(RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Writing raw body of size {}", msg.getRawBodySize()); + } cos.writeRawVarint32(RAW_BODY_TAG); cos.writeRawVarint32(rawBodyLength); @@ -107,23 +113,24 @@ class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{ CompositeByteBuf cbb = new CompositeByteBuf(buf.alloc(), true, msg.dBodies.length + 1); cbb.addComponent(buf); int bufLength = buf.readableBytes(); - for(ByteBuf b : msg.dBodies){ + for (ByteBuf b : msg.dBodies) { cbb.addComponent(b); bufLength += b.readableBytes(); } cbb.writerIndex(bufLength); out.add(cbb); - - - }else{ + } else { cos.flush(); out.add(buf); } - if(RpcConstants.SOME_DEBUGGING) logger.debug("Wrote message length {}:{} bytes (head:body). Message: " + msg, getRawVarintSize(fullLength), fullLength); - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Sent message. Ending writer index was {}.", buf.writerIndex()); - - }finally{ + if (RpcConstants.SOME_DEBUGGING) { + logger.debug("Wrote message length {}:{} bytes (head:body). Message: " + msg, getRawVarintSize(fullLength), fullLength); + } + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Sent message. Ending writer index was {}.", buf.writerIndex()); + } + } finally { // make sure to release Rpc Messages underlying byte buffers. //msg.release(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java index 3d8f02b..eb870b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java @@ -49,14 +49,20 @@ public class RpcException extends DrillIOException{ super(cause); } - public static RpcException mapException(Throwable t){ - while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause(); - if(t instanceof RpcException) return ((RpcException) t); + public static RpcException mapException(Throwable t) { + while (t instanceof ExecutionException) { + t = ((ExecutionException)t).getCause(); + } + if (t instanceof RpcException) { + return ((RpcException) t); + } return new RpcException(t); } - public static RpcException mapException(String message, Throwable t){ - while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause(); + public static RpcException mapException(String message, Throwable t) { + while (t instanceof ExecutionException) { + t = ((ExecutionException)t).getCause(); + } return new RpcException(message, t); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java index 37c9ce2..06d6e77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java @@ -26,7 +26,7 @@ import org.apache.drill.exec.work.batch.ControlMessageHandler; import com.google.common.collect.Maps; -public class ConnectionManagerRegistry implements Iterable<ControlConnectionManager>{ +public class ConnectionManagerRegistry implements Iterable<ControlConnectionManager> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class); private final ConcurrentMap<DrillbitEndpoint, ControlConnectionManager> registry = Maps.newConcurrentMap(); @@ -41,13 +41,15 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana this.context = context; } - public ControlConnectionManager getConnectionManager(DrillbitEndpoint endpoint){ + public ControlConnectionManager getConnectionManager(DrillbitEndpoint endpoint) { assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved"; ControlConnectionManager m = registry.get(endpoint); - if(m == null){ + if (m == null) { m = new ControlConnectionManager(endpoint, localEndpoint, handler, context); ControlConnectionManager m2 = registry.putIfAbsent(endpoint, m); - if(m2 != null) m = m2; + if (m2 != null) { + m = m2; + } } return m; @@ -58,7 +60,7 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana return registry.values().iterator(); } - public void setEndpoint(DrillbitEndpoint endpoint){ + public void setEndpoint(DrillbitEndpoint endpoint) { this.localEndpoint = endpoint; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java index 879df40..d546db3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java @@ -86,7 +86,9 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo @Override protected void validateHandshake(BitControlHandshake handshake) throws RpcException { - if(handshake.getRpcVersion() != ControlRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), ControlRpcConfig.RPC_VERSION)); + if (handshake.getRpcVersion() != ControlRpcConfig.RPC_VERSION) { + throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), ControlRpcConfig.RPC_VERSION)); + } } @Override @@ -94,7 +96,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo connection.setEndpoint(handshake.getEndpoint()); } - public ControlConnection getConnection(){ + public ControlConnection getConnection() { return this.connection; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java index 6ac6dd5..a7aaa9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java @@ -87,28 +87,35 @@ public class ControlConnection extends RemoteConnection { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } ControlConnection other = (ControlConnection) obj; if (id == null) { - if (other.id != null) + if (other.id != null) { return false; - } else if (!id.equals(other.id)) + } + } else if (!id.equals(other.id)) { return false; + } return true; } public void shutdownIfClient() { - if (bus.isClient()) + if (bus.isClient()) { Closeables.closeQuietly(bus); + } } @Override public BufferAllocator getAllocator() { return allocator; } -} \ No newline at end of file + +}