http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index faca32a..39bdb94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -66,31 +66,32 @@ public final class JoinStatus { this.joinType = output.getJoinType(); } - private final IterOutcome nextLeft(){ + private final IterOutcome nextLeft() { return outputBatch.next(LEFT_INPUT, left); } - private final IterOutcome nextRight(){ + private final IterOutcome nextRight() { return outputBatch.next(RIGHT_INPUT, right); } - public final void ensureInitial(){ - if(!initialSet){ + public final void ensureInitial() { + if(!initialSet) { this.lastLeft = nextLeft(); this.lastRight = nextRight(); initialSet = true; } } - public final void advanceLeft(){ + public final void advanceLeft() { leftPosition++; } - public final void advanceRight(){ - if (rightSourceMode == RightSourceMode.INCOMING) + public final void advanceRight() { + if (rightSourceMode == RightSourceMode.INCOMING) { rightPosition++; - else + } else { svRightPosition++; + } } public final int getLeftPosition() { @@ -101,7 +102,7 @@ public final class JoinStatus { return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition; } - public final int getRightCount(){ + public final int getRightCount() { return right.getRecordCount(); } @@ -153,9 +154,10 @@ public final class JoinStatus { * Check if the left record position can advance by one. * Side effect: advances to next left batch if current left batch size is exceeded. */ - public final boolean isLeftPositionAllowed(){ - if (lastLeft == IterOutcome.NONE) + public final boolean isLeftPositionAllowed() { + if (lastLeft == IterOutcome.NONE) { return false; + } if (!isLeftPositionInCurrentBatch()) { leftPosition = 0; releaseData(left); @@ -170,11 +172,13 @@ public final class JoinStatus { * Check if the right record position can advance by one. * Side effect: advances to next right batch if current right batch size is exceeded */ - public final boolean isRightPositionAllowed(){ - if (rightSourceMode == RightSourceMode.SV4) + public final boolean isRightPositionAllowed() { + if (rightSourceMode == RightSourceMode.SV4) { return svRightPosition < sv4.getCount(); - if (lastRight == IterOutcome.NONE) + } + if (lastRight == IterOutcome.NONE) { return false; + } if (!isRightPositionInCurrentBatch()) { rightPosition = 0; releaseData(right); @@ -185,11 +189,13 @@ public final class JoinStatus { return true; } - private void releaseData(RecordBatch b){ - for(VectorWrapper<?> v : b){ + private void releaseData(RecordBatch b) { + for (VectorWrapper<?> v : b) { v.clear(); } - if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear(); + if (b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { + b.getSelectionVector2().clear(); + } } /** @@ -220,29 +226,34 @@ public final class JoinStatus { return rightPosition + 1 < right.getRecordCount(); } - public JoinOutcome getOutcome(){ - if (!ok) + public JoinOutcome getOutcome() { + if (!ok) { return JoinOutcome.FAILURE; + } if (bothMatches(IterOutcome.NONE) || (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) || (joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) || - (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) + (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) { return JoinOutcome.NO_MORE_DATA; + } if (bothMatches(IterOutcome.OK) || - (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) + (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) { return JoinOutcome.BATCH_RETURNED; - if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) + } + if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) { return JoinOutcome.SCHEMA_CHANGED; - if (eitherMatches(IterOutcome.NOT_YET)) + } + if (eitherMatches(IterOutcome.NOT_YET)) { return JoinOutcome.WAITING; + } return JoinOutcome.FAILURE; } - private boolean bothMatches(IterOutcome outcome){ + private boolean bothMatches(IterOutcome outcome) { return lastLeft == outcome && lastRight == outcome; } - private boolean eitherMatches(IterOutcome outcome){ + private boolean eitherMatches(IterOutcome outcome) { return lastLeft == outcome || lastRight == outcome; }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index bb3b9ac..c1dffc1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -94,8 +94,9 @@ public abstract class JoinTemplate implements JoinWorker { if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { // we've hit the end of the right record batch; copy any remaining values from the left batch while (status.isLeftPositionAllowed()) { - if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) { return false; + } status.incOutputPos(); status.advanceLeft(); @@ -103,8 +104,9 @@ public abstract class JoinTemplate implements JoinWorker { } return true; } - if (!status.isLeftPositionAllowed()) + if (!status.isLeftPositionAllowed()) { return true; + } int comparison = doCompare(status.getLeftPosition(), status.getRightPosition()); switch (comparison) { @@ -112,8 +114,9 @@ public abstract class JoinTemplate implements JoinWorker { case -1: // left key < right key if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { - if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) { return false; + } status.incOutputPos(); } status.advanceLeft(); @@ -125,25 +128,27 @@ public abstract class JoinTemplate implements JoinWorker { // check for repeating values on the left side if (!status.isLeftRepeating() && status.isNextLeftPositionInCurrentBatch() && - doCompareNextLeftKey(status.getLeftPosition()) == 0) + doCompareNextLeftKey(status.getLeftPosition()) == 0) { // subsequent record(s) in the left batch have the same key status.notifyLeftRepeating(); - - else if (status.isLeftRepeating() && + } else if (status.isLeftRepeating() && status.isNextLeftPositionInCurrentBatch() && - doCompareNextLeftKey(status.getLeftPosition()) != 0) + doCompareNextLeftKey(status.getLeftPosition()) != 0) { // this record marks the end of repeated keys status.notifyLeftStoppedRepeating(); + } boolean crossedBatchBoundaries = false; int initialRightPosition = status.getRightPosition(); do { // copy all equal right keys to the output record batch - if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) { return false; + } - if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) + if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) { return false; + } status.incOutputPos(); @@ -159,9 +164,10 @@ public abstract class JoinTemplate implements JoinWorker { } while ((!status.isLeftRepeating() || status.isRightPositionInCurrentBatch()) && status.isRightPositionAllowed() && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0); if (status.getRightPosition() > initialRightPosition && - (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) + (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) { // more than one matching result from right table; reset position in case of subsequent left match status.setRightPosition(initialRightPosition); + } status.advanceLeft(); if (status.isLeftRepeating() && doCompareNextLeftKey(status.getLeftPosition()) != 0) { @@ -233,5 +239,4 @@ public abstract class JoinTemplate implements JoinWorker { */ protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex); - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index b24b534..1d4e353 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -144,19 +144,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { status.ensureInitial(); // loop so we can start over again if we find a new batch was created. - while(true){ + while (true) { JoinOutcome outcome = status.getOutcome(); // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED) + outcome == JoinOutcome.SCHEMA_CHANGED) { allocateBatch(); + } // reset the output position to zero after our parent iterates this RecordBatch if (outcome == JoinOutcome.BATCH_RETURNED || outcome == JoinOutcome.SCHEMA_CHANGED || - outcome == JoinOutcome.NO_MORE_DATA) + outcome == JoinOutcome.NO_MORE_DATA) { status.resetOutputPos(); + } if (outcome == JoinOutcome.NO_MORE_DATA) { logger.debug("NO MORE DATA; returning {} NONE"); @@ -164,7 +166,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } boolean first = false; - if(worker == null){ + if (worker == null) { try { logger.debug("Creating New Worker"); stats.startSetup(); @@ -180,11 +182,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } // join until we have a complete outgoing batch - if (!worker.doJoin(status)) + if (!worker.doJoin(status)) { worker = null; + } // get the outcome of the join. - switch(status.getOutcome()){ + switch (status.getOutcome()) { case BATCH_RETURNED: // only return new schema if new worker has been setup. logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); @@ -200,7 +203,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE); case SCHEMA_CHANGED: worker = null; - if(status.getOutPosition() > 0){ + if (status.getOutPosition() > 0) { // if we have current data, let's return that. logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); setRecordCountInContainer(); @@ -218,7 +221,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } private void setRecordCountInContainer() { - for(VectorWrapper vw : container){ + for (VectorWrapper vw : container) { Preconditions.checkArgument(!vw.isHyper()); vw.getValueVector().getMutator().setValueCount(getRecordCount()); } @@ -257,9 +260,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { // materialize value vector readers from join expression final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry()); - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString())); + } // generate compareNextLeftKey() //////////////////////////////// @@ -475,9 +479,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT)); } - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString())); + } LogicalExpression materializedRightExpr; if (worker == null || status.isRightPositionAllowed()) { @@ -485,9 +490,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT)); } - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString())); + } // generate compare() //////////////////////// @@ -519,4 +525,5 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { //Pass the equality check for all the join conditions. Finally, return 0. cg.getEvalBlock()._return(JExpr.lit(0)); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java index 904d38c..1187bd6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java @@ -50,15 +50,24 @@ public class MergeJoinBatchBuilder { } public boolean add(RecordBatch batch) { - if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) + if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) { throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch."); - if (batch.getRecordCount() == 0) return true; // skip over empty record batches. + } + if (batch.getRecordCount() == 0) { + return true; // skip over empty record batches. + } // resource checks long batchBytes = getSize(batch); - if (batchBytes + runningBytes > Integer.MAX_VALUE) return false; // TODO: 2GB is arbitrary - if (runningBatches++ >= Character.MAX_VALUE) return false; // allowed in batch. - if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available. + if (batchBytes + runningBytes > Integer.MAX_VALUE) { + return false; // TODO: 2GB is arbitrary + } + if (runningBatches++ >= Character.MAX_VALUE) { + return false; // allowed in batch. + } + if (!svAllocator.preAllocate(batch.getRecordCount()*4)) { + return false; // sv allocation available. + } // transfer VVs to a new RecordBatchData RecordBatchData bd = new RecordBatchData(batch); @@ -68,9 +77,9 @@ public class MergeJoinBatchBuilder { return true; } - private long getSize(RecordBatch batch){ + private long getSize(RecordBatch batch) { long bytes = 0; - for(VectorWrapper<?> v : batch){ + for (VectorWrapper<?> v : batch) { bytes += v.getValueVector().getBufferSize(); } return bytes; @@ -78,18 +87,20 @@ public class MergeJoinBatchBuilder { public void build() throws SchemaChangeException { container.clear(); - if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + if (queuedRightBatches.size() > Character.MAX_VALUE) { + throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + } status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE); BatchSchema schema = queuedRightBatches.keySet().iterator().next(); List<RecordBatchData> data = queuedRightBatches.get(schema); // now we're going to generate the sv4 pointers - switch(schema.getSelectionVectorMode()){ + switch (schema.getSelectionVectorMode()) { case NONE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { status.sv4.set(index, recordBatchId, i); } recordBatchId++; @@ -99,8 +110,8 @@ public class MergeJoinBatchBuilder { case TWO_BYTE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i)); } // might as well drop the selection vector since we'll stop using it now. @@ -121,7 +132,7 @@ public class MergeJoinBatchBuilder { } } - for(MaterializedField f : vectors.keySet()){ + for (MaterializedField f : vectors.keySet()) { List<ValueVector> v = vectors.get(f); container.addHyperList(v); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index cf2e36f..29fd80f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -133,7 +133,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> stats.startWait(); try { RawFragmentBatch b = provider.getNext(); - if(b != null){ + if (b != null) { stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount()); stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); } @@ -191,7 +191,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> emptyBatch = rawBatch; } try { - while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0); + while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { + ; + } if (rawBatch == null && context.isCancelled()) { return IterOutcome.STOP; } @@ -400,14 +402,17 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> batchOffsets[node.batchId] = 0; // add front value from batch[x] to priority queue - if (batchLoaders[node.batchId].getRecordCount() != 0) + if (batchLoaders[node.batchId].getRecordCount() != 0) { pqueue.add(new Node(node.batchId, 0)); + } } else { pqueue.add(new Node(node.batchId, node.valueIndex + 1)); } - if (prevBatchWasFull) break; + if (prevBatchWasFull) { + break; + } } // set the value counts in the outgoing vectors @@ -589,11 +594,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException { g.setMappingSet(MAIN_MAPPING); - for(Ordering od : popConfig.getOrderings()){ + for (Ordering od : popConfig.getOrderings()) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(LEFT_MAPPING); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(RIGHT_MAPPING); @@ -605,9 +612,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); - }else{ + } else { jc._then()._return(out.getValue().minus()); } } @@ -648,7 +655,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> public void cleanup() { outgoingContainer.clear(); if (batchLoaders != null) { - for(RecordBatchLoader rbl : batchLoaders){ + for (RecordBatchLoader rbl : batchLoaders) { if (rbl != null) { rbl.clear(); } @@ -662,4 +669,4 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 45f32cf..aecf363 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -189,8 +189,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } builder.add(incoming); recordsSampled += incoming.getRecordCount(); - if (upstream == IterOutcome.NONE) + if (upstream == IterOutcome.NONE) { break; + } } VectorContainer sortedSamples = new VectorContainer(); builder.build(context, sortedSamples); @@ -258,7 +259,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart try { - if (!saveSamples()){ + if (!saveSamples()) { return false; } @@ -277,16 +278,17 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Wait until sufficient number of fragments have submitted samples, or proceed after xx ms passed // TODO: this should be polling. - if (val < fragmentsBeforeProceed) + if (val < fragmentsBeforeProceed) { Thread.sleep(10); + } for (int i = 0; i < 100 && finalTable == null; i++) { finalTable = tableMap.get(finalTableKey); - if (finalTable != null){ + if (finalTable != null) { break; } Thread.sleep(10); } - if (finalTable == null){ + if (finalTable == null) { buildTable(); } finalTable = tableMap.get(finalTableKey); @@ -429,8 +431,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are // done - if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) + if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) { return IterOutcome.NONE; + } // if there are batches on the queue, process them first, rather than calling incoming.next() if (batchQueue != null && batchQueue.size() > 0) { @@ -461,7 +464,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // If this is the first iteration, we need to generate the partition vectors before we can proceed if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) { - if (!getPartitionVectors()){ + if (!getPartitionVectors()) { cleanup(); return IterOutcome.STOP; } @@ -490,8 +493,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema. if (this.startedUnsampledBatches == false) { this.startedUnsampledBatches = true; - if (upstream == IterOutcome.OK) + if (upstream == IterOutcome.OK) { upstream = IterOutcome.OK_NEW_SCHEMA; + } } switch (upstream) { case NONE: @@ -560,8 +564,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart int count = 0; for (Ordering od : popConfig.getOrderings()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } cg.setMappingSet(incomingMapping); ClassGenerator.HoldingContainer left = cg.addExpr(expr, false); cg.setMappingSet(partitionMapping); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 051a590..7f3a966 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -120,7 +120,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { @Override public void run() { try { - if (stop) return; + if (stop) { + return; + } outer: while (true) { IterOutcome upstream = incoming.next(); @@ -208,4 +210,5 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { this.failed = failed; } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index ec29cac..a1a8340 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -195,55 +195,64 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private boolean doAlloc() { //Allocate vv in the allocationVectors. - for(ValueVector v : this.allocationVectors){ + for (ValueVector v : this.allocationVectors) { //AllocationHelper.allocate(v, remainingRecordCount, 250); - if (!v.allocateNewSafe()) + if (!v.allocateNewSafe()) { return false; + } } //Allocate vv for complexWriters. - if (complexWriters == null) + if (complexWriters == null) { return true; + } - for (ComplexWriter writer : complexWriters) + for (ComplexWriter writer : complexWriters) { writer.allocate(); + } return true; } private void setValueCount(int count) { - for(ValueVector v : allocationVectors){ + for (ValueVector v : allocationVectors) { ValueVector.Mutator m = v.getMutator(); m.setValueCount(count); } - if (complexWriters == null) + if (complexWriters == null) { return; + } - for (ComplexWriter writer : complexWriters) + for (ComplexWriter writer : complexWriters) { writer.setValueCount(count); + } } /** hack to make ref and full work together... need to figure out if this is still necessary. **/ - private FieldReference getRef(NamedExpression e){ + private FieldReference getRef(NamedExpression e) { FieldReference ref = e.getRef(); PathSegment seg = ref.getRootSegment(); -// if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){ +// if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) { // return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition()); // } return ref; } - private boolean isAnyWildcard(List<NamedExpression> exprs){ - for(NamedExpression e : exprs){ - if(isWildcard(e)) return true; + private boolean isAnyWildcard(List<NamedExpression> exprs) { + for (NamedExpression e : exprs) { + if (isWildcard(e)) { + return true; + } } return false; } - private boolean isWildcard(NamedExpression ex){ - if( !(ex.getExpr() instanceof SchemaPath)) return false; + private boolean isWildcard(NamedExpression ex) { + if ( !(ex.getExpr() instanceof SchemaPath)) { + return false; + } NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); NameSegment ref = ex.getRef().getRootSegment(); return ref.getPath().equals("*") && expr.getPath().equals("*"); @@ -266,7 +275,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ ClassifierResult result = new ClassifierResult(); boolean classify = isClassificationNeeded(exprs); - for(int i = 0; i < exprs.size(); i++){ + for (int i = 0; i < exprs.size(); i++) { final NamedExpression namedExpression = exprs.get(i); result.clear(); @@ -278,14 +287,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ Integer value = result.prefixMap.get(result.prefix); if (value != null && value.intValue() == 1) { int k = 0; - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); SchemaPath originalPath = vvIn.getField().getPath(); if (k > result.outputNames.size()-1) { assert false; } String name = result.outputNames.get(k++); // get the renamed column names - if (name == EMPTY_STRING) continue; + if (name == EMPTY_STRING) { + continue; + } FieldReference ref = new FieldReference(name); TransferPair tp = wrapper.getValueVector().getTransferPair(ref); transfers.add(tp); @@ -293,17 +304,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors int k = 0; - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); SchemaPath originalPath = vvIn.getField().getPath(); if (k > result.outputNames.size()-1) { assert false; } String name = result.outputNames.get(k++); // get the renamed column names - if (name == EMPTY_STRING) continue; + if (name == EMPTY_STRING) { + continue; + } final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() ); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } @@ -333,16 +346,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. - if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE + if (expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE && !((ValueVectorReadExpression) expr).hasReadPath() && !isAnyWildcard - && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0]) - ) { + && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) { ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; TypedFieldId id = vectorRead.getFieldId(); @@ -358,8 +370,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) { // Need to process ComplexWriter function evaluation. // Lazy initialization of the list of complex writers, if not done yet. - if (complexWriters == null) + if (complexWriters == null) { complexWriters = Lists.newArrayList(); + } // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); @@ -419,9 +432,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private boolean isClassificationNeeded(List<NamedExpression> exprs) { boolean needed = false; - for(int i = 0; i < exprs.size(); i++){ + for (int i = 0; i < exprs.size(); i++) { final NamedExpression ex = exprs.get(i); - if (!(ex.getExpr() instanceof SchemaPath)) continue; + if (!(ex.getExpr() instanceof SchemaPath)) { + continue; + } NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); NameSegment ref = ex.getRef().getRootSegment(); boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); @@ -530,7 +545,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.outputNames.add(EMPTY_STRING); // initialize } - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); // get the prefix of the name @@ -586,7 +601,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.outputNames.add(EMPTY_STRING); // initialize } - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String name = vvIn.getField().getPath().getRootSegment().getPath(); String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2); @@ -627,7 +642,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } int k = 0; - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java index b36bd92..49ad390 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java @@ -39,27 +39,25 @@ public abstract class ProjectorTemplate implements Projector { private SelectionVector4 vector4; private SelectionVectorMode svMode; - public ProjectorTemplate() throws SchemaChangeException{ + public ProjectorTemplate() throws SchemaChangeException { } @Override public final int projectRecords(int startIndex, final int recordCount, int firstOutputIndex) { - switch(svMode){ + switch (svMode) { case FOUR_BYTE: throw new UnsupportedOperationException(); - case TWO_BYTE: final int count = recordCount; - for(int i = 0; i < count; i++, firstOutputIndex++){ - if (!doEval(vector2.getIndex(i), firstOutputIndex)) + for (int i = 0; i < count; i++, firstOutputIndex++) { + if (!doEval(vector2.getIndex(i), firstOutputIndex)) { return i; + } } return recordCount; - case NONE: - final int countN = recordCount; int i; for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) { @@ -68,18 +66,16 @@ public abstract class ProjectorTemplate implements Projector { } } if (i < startIndex + recordCount || startIndex > 0) { - for(TransferPair t : transfers){ + for (TransferPair t : transfers) { t.splitAndTransfer(startIndex, i - startIndex); } return i - startIndex; } - for(TransferPair t : transfers){ + for (TransferPair t : transfers) { t.transfer(); } return recordCount; - - default: throw new UnsupportedOperationException(); } @@ -89,7 +85,7 @@ public abstract class ProjectorTemplate implements Projector { public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{ this.svMode = incoming.getSchema().getSelectionVectorMode(); - switch(svMode){ + switch (svMode) { case FOUR_BYTE: this.vector4 = incoming.getSelectionVector4(); break; @@ -104,8 +100,4 @@ public abstract class ProjectorTemplate implements Projector { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - - - - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 8116869..419dc85 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -40,7 +40,7 @@ public class RecordBatchData { private int recordCount; VectorContainer container = new VectorContainer(); - public RecordBatchData(VectorAccessible batch){ + public RecordBatchData(VectorAccessible batch) { List<ValueVector> vectors = Lists.newArrayList(); if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { this.sv2 = ((RecordBatch)batch).getSelectionVector2().clone(); @@ -48,8 +48,10 @@ public class RecordBatchData { this.sv2 = null; } - for(VectorWrapper<?> v : batch){ - if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); + for (VectorWrapper<?> v : batch) { + if (v.isHyper()) { + throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); + } TransferPair tp = v.getValueVector().getTransferPair(); tp.transfer(); vectors.add(tp.getTo()); @@ -67,9 +69,10 @@ public class RecordBatchData { container.buildSchema(mode); } - public int getRecordCount(){ + public int getRecordCount() { return recordCount; } + public List<ValueVector> getVectors() { List<ValueVector> vectors = Lists.newArrayList(); for (VectorWrapper w : container) { @@ -91,4 +94,5 @@ public class RecordBatchData { public VectorContainer getContainer() { return container; } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 3a37491..19f5423 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -82,8 +82,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return builder.getSv4(); } - - @Override public void cleanup() { builder.clear(); @@ -93,15 +91,14 @@ public class SortBatch extends AbstractRecordBatch<Sort> { @Override public IterOutcome innerNext() { - if(schema != null){ - if(getSelectionVector4().next()){ + if (schema != null) { + if (getSelectionVector4().next()) { return IterOutcome.OK; - }else{ + } else { return IterOutcome.NONE; } } - try{ outer: while (true) { IterOutcome upstream = incoming.next(); @@ -114,13 +111,15 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if(!incoming.getSchema().equals(schema)){ - if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + if (!incoming.getSchema().equals(schema)) { + if (schema != null) { + throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + } this.schema = incoming.getSchema(); } // fall through. case OK: - if(!builder.add(incoming)){ + if (!builder.add(incoming)) { throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort."); }; break; @@ -129,7 +128,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } } - if (schema == null || builder.isEmpty()){ + if (schema == null || builder.isEmpty()) { // builder may be null at this point if the first incoming batch is empty return IterOutcome.NONE; } @@ -141,7 +140,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return IterOutcome.OK_NEW_SCHEMA; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -167,11 +166,13 @@ public class SortBatch extends AbstractRecordBatch<Sort> { ClassGenerator<Sorter> g = cg.getRoot(); g.setMappingSet(mainMapping); - for(Ordering od : orderings){ + for(Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(leftMapping); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(rightMapping); @@ -183,7 +184,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); @@ -193,8 +194,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { g.getEvalBlock()._return(JExpr.lit(0)); return context.getImplementationClass(cg); - - } @Override @@ -207,7 +206,4 @@ public class SortBatch extends AbstractRecordBatch<Sort> { incoming.kill(sendUpstream); } - - - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 80b4ef6..707c41c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -49,14 +49,14 @@ public class SortRecordBatchBuilder { private SelectionVector4 sv4; final PreAllocator svAllocator; - public SortRecordBatchBuilder(BufferAllocator a, long maxBytes){ + public SortRecordBatchBuilder(BufferAllocator a, long maxBytes) { this.maxBytes = maxBytes; this.svAllocator = a.getNewPreAllocator(); } - private long getSize(VectorAccessible batch){ + private long getSize(VectorAccessible batch) { long bytes = 0; - for(VectorWrapper<?> v : batch){ + for (VectorWrapper<?> v : batch) { bytes += v.getValueVector().getBufferSize(); } return bytes; @@ -68,8 +68,10 @@ public class SortRecordBatchBuilder { * @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages. * @throws SchemaChangeException */ - public boolean add(VectorAccessible batch){ - if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch."); + public boolean add(VectorAccessible batch) { + if (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) { + throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch."); + } if (batch.getRecordCount() == 0 && batches.size() > 0) { return true; // skip over empty record batches. } @@ -78,9 +80,15 @@ public class SortRecordBatchBuilder { if (batchBytes == 0 && batches.size() > 0) { return true; } - if(batchBytes + runningBytes > maxBytes) return false; // enough data memory. - if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch. - if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available. + if (batchBytes + runningBytes > maxBytes) { + return false; // enough data memory. + } + if (runningBatches+1 > Character.MAX_VALUE) { + return false; // allowed in batch. + } + if (!svAllocator.preAllocate(batch.getRecordCount()*4)) { + return false; // sv allocation available. + } RecordBatchData bd = new RecordBatchData(batch); @@ -126,15 +134,19 @@ public class SortRecordBatchBuilder { } } - public boolean isEmpty(){ + public boolean isEmpty() { return batches.isEmpty(); } public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException{ outputContainer.clear(); - if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema."); - if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); - if(batches.keys().size() < 1){ + if (batches.keySet().size() > 1) { + throw new SchemaChangeException("Sort currently only supports a single schema."); + } + if (batches.size() > Character.MAX_VALUE) { + throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + } + if (batches.keys().size() < 1) { assert false : "Invalid to have an empty set of batches with no schemas."; } sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE); @@ -142,12 +154,12 @@ public class SortRecordBatchBuilder { List<RecordBatchData> data = batches.get(schema); // now we're going to generate the sv4 pointers - switch(schema.getSelectionVectorMode()){ + switch (schema.getSelectionVectorMode()) { case NONE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { sv4.set(index, recordBatchId, i); } recordBatchId++; @@ -157,8 +169,8 @@ public class SortRecordBatchBuilder { case TWO_BYTE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i)); } // might as well drop the selection vector since we'll stop using it now. @@ -173,13 +185,13 @@ public class SortRecordBatchBuilder { // next, we'll create lists of each of the vector types. ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create(); - for(RecordBatchData rbd : batches.values()){ - for(ValueVector v : rbd.getVectors()){ + for (RecordBatchData rbd : batches.values()) { + for (ValueVector v : rbd.getVectors()) { vectors.put(v.getField(), v); } } - for(MaterializedField f : schema){ + for (MaterializedField f : schema) { List<ValueVector> v = vectors.get(f); outputContainer.addHyperList(v, false); } @@ -191,11 +203,13 @@ public class SortRecordBatchBuilder { return sv4; } - public void clear(){ - for(RecordBatchData d : batches.values()){ + public void clear() { + for (RecordBatchData d : batches.values()) { d.container.clear(); } - if(sv4 != null) sv4.clear(); + if (sv4 != null) { + sv4.clear(); + } } public List<VectorContainer> getHeldRecordBatches() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 609cb29..6d90962 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -88,10 +88,11 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { @Override public int getRecordCount() { - if (sv == null) + if (sv == null) { return incoming.getRecordCount(); - else + } else { return sv.getCount(); + } } /** @@ -125,8 +126,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { @Override protected void setupNewSchema() throws SchemaChangeException { /* Trace operator does not deal with hyper vectors yet */ - if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) + if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) { throw new SchemaChangeException("Trace operator does not work with hyper vectors"); + } /* * we have a new schema, clear our existing container to load the new value vectors @@ -152,8 +154,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { @Override public void cleanup() { /* Release the selection vector */ - if (sv != null) + if (sv != null) { sv.clear(); + } /* Close the file descriptors */ try { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 0e69bcf..171d12c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -111,7 +111,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch { @Override public IterOutcome next() { - if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); + if (state == IterOutcome.NONE ) { + throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); + } state = incoming.next(); if (first && state == IterOutcome.NONE) { throw new IllegalStateException("The incoming iterator returned a state of NONE on the first batch. There should always be at least one batch output before returning NONE"); @@ -119,14 +121,16 @@ public class IteratorValidatorBatchIterator implements RecordBatch { if (first && state == IterOutcome.OK) { throw new IllegalStateException("The incoming iterator returned a state of OK on the first batch. There should always be a new schema on the first batch. Incoming: " + incoming.getClass().getName()); } - if (first) first = !first; + if (first) { + first = !first; + } - if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) { + if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) { BatchSchema schema = incoming.getSchema(); - if(schema.getFieldCount() == 0){ + if (schema.getFieldCount() == 0) { throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed."); } - if(incoming.getRecordCount() > MAX_BATCH_SIZE){ + if (incoming.getRecordCount() > MAX_BATCH_SIZE) { throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE)); } @@ -157,4 +161,5 @@ public class IteratorValidatorBatchIterator implements RecordBatch { public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java index 428f335..2f7f531 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java @@ -36,10 +36,11 @@ public class IteratorValidatorInjector extends IteratorValidatorInjector inject = new IteratorValidatorInjector(); PhysicalOperator newOp = root.accept(inject, context); - if( !(newOp instanceof FragmentRoot) ) throw new IllegalStateException("This shouldn't happen."); + if ( !(newOp instanceof FragmentRoot) ) { + throw new IllegalStateException("This shouldn't happen."); + } return (FragmentRoot) newOp; - } /** @@ -67,12 +68,11 @@ public class IteratorValidatorInjector extends } /* Inject trace operator */ - if (newChildren.size() > 0){ + if (newChildren.size() > 0) { newOp = op.getNewWithChildren(newChildren); newOp.setOperatorId(op.getOperatorId()); } - return newOp; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java index 2370070..9359ea1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java @@ -143,14 +143,24 @@ public class BatchGroup implements VectorAccessible { } public void cleanup() throws IOException { - if (sv2 != null) sv2.clear(); - if (outputStream != null) outputStream.close(); - if (inputStream != null) inputStream.close(); - if (fs != null && fs.exists(path)) fs.delete(path, false); + if (sv2 != null) { + sv2.clear(); + } + if (outputStream != null) { + outputStream.close(); + } + if (inputStream != null) { + inputStream.close(); + } + if (fs != null && fs.exists(path)) { + fs.delete(path, false); + } } public void closeOutputStream() throws IOException { - if (outputStream != null) outputStream.close(); + if (outputStream != null) { + outputStream.close(); + } } @Override @@ -181,4 +191,5 @@ public class BatchGroup implements VectorAccessible { public Iterator<VectorWrapper<?>> iterator() { return currentContainer.iterator(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 505f567..52249e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -192,12 +192,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { @Override public IterOutcome innerNext() { - if(schema != null){ + if (schema != null) { if (spillCount == 0) { - if(schema != null){ - if(getSelectionVector4().next()){ + if (schema != null) { + if (getSelectionVector4().next()) { return IterOutcome.OK; - }else{ + } else { return IterOutcome.NONE; } } @@ -206,12 +206,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { w.start(); // int count = selector.next(); int count = copier.next(targetRecordCount); - if(count > 0){ + if (count > 0) { long t = w.elapsed(TimeUnit.MICROSECONDS); logger.debug("Took {} us to merge {} records", t, count); container.setRecordCount(count); return IterOutcome.OK; - }else{ + } else { logger.debug("copier returned 0 records"); return IterOutcome.NONE; } @@ -236,8 +236,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if(!incoming.getSchema().equals(schema)){ - if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + if (!incoming.getSchema().equals(schema)) { + if (schema != null) { + throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + } this.schema = incoming.getSchema(); this.sorter = createNewSorter(context, incoming); } @@ -249,7 +251,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } break; } - if (first) first = false; + if (first) { + first = false; + } totalSizeInMemory += getBufferSize(incoming); SelectionVector2 sv2; if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { @@ -291,7 +295,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { break; case OUT_OF_MEMORY: highWaterMark = totalSizeInMemory; - if (batchesSinceLastSpill > 2) mergeAndSpill(); + if (batchesSinceLastSpill > 2) { + mergeAndSpill(); + } batchesSinceLastSpill = 0; break; default: @@ -348,7 +354,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return IterOutcome.OK_NEW_SCHEMA; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -502,11 +508,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { ClassGenerator<MSorter> g = cg.getRoot(); g.setMappingSet(mainMapping); - for(Ordering od : orderings){ + for (Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(leftMapping); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(rightMapping); @@ -518,7 +526,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); @@ -547,11 +555,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException { g.setMappingSet(MAIN_MAPPING); - for(Ordering od : popConfig.getOrderings()){ + for (Ordering od : popConfig.getOrderings()) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(LEFT_MAPPING); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(RIGHT_MAPPING); @@ -563,7 +573,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); @@ -590,7 +600,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } List<VectorAllocator> allocators = Lists.newArrayList(); - for(VectorWrapper<?> i : batch){ + for (VectorWrapper<?> i : batch) { ValueVector v = TypeHelper.getNewVector(i.getField(), copierAllocator); outputContainer.add(v); allocators.add(VectorAllocator.getAllocator(v, 110)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index df79b1a..3fd744f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -84,7 +84,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ while (l < rightStart) { aux.set(o++, vector4.get(l++)); } - while (r < rightEnd){ + while (r < rightEnd) { aux.set(o++, vector4.get(r++)); } assert o == outStart + (rightEnd - leftStart); @@ -97,7 +97,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ } @Override - public void sort(VectorContainer container){ + public void sort(VectorContainer container) { Stopwatch watch = new Stopwatch(); watch.start(); while (runStarts.size() > 1) { @@ -109,9 +109,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ int left = runStarts.poll(); int right = runStarts.poll(); Integer end = runStarts.peek(); - if (end == null) end = vector4.getTotalCount(); + if (end == null) { + end = vector4.getTotalCount(); + } outIndex = merge(left, right, end, outIndex); - if (outIndex < vector4.getTotalCount()) newRunStarts.add(outIndex); + if (outIndex < vector4.getTotalCount()) { + newRunStarts.add(outIndex); + } } if (outIndex < vector4.getTotalCount()) { copyRun(outIndex, vector4.getTotalCount()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java index aa2f786..9beef39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/StarColumnHelper.java @@ -35,8 +35,9 @@ public class StarColumnHelper { List<String> fieldNames = type.getFieldNames(); for (String s : fieldNames) { - if (s.startsWith(STAR_COLUMN)) + if (s.startsWith(STAR_COLUMN)) { return true; + } } return false; @@ -71,8 +72,9 @@ public class StarColumnHelper { // Given a set of prefixes, check if a regular column is subsumed by any of the prefixed star column in the set. public static boolean subsumeRegColumn(Set<String> prefixes, String fieldName) { - if (isPrefixedStarColumn(fieldName)) + if (isPrefixedStarColumn(fieldName)) { return false; // only applies to regular column. + } return prefixes.contains(extractColumnPrefix(fieldName)); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java index c33bb22..87a1ea3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java @@ -130,25 +130,25 @@ public class DrillCostBase implements DrillRelOptCost { this.memory = memory; } - @Override - public double getRows() { - return rowCount; - } - - @Override - public double getCpu() { - return cpu; - } - - @Override - public double getIo() { - return io; - } - - @Override - public double getNetwork() { - return network; - } + @Override + public double getRows() { + return rowCount; + } + + @Override + public double getCpu() { + return cpu; + } + + @Override + public double getIo() { + return io; + } + + @Override + public double getNetwork() { + return network; + } public double getMemory() { return memory; @@ -159,31 +159,31 @@ public class DrillCostBase implements DrillRelOptCost { return Util.hashCode(rowCount) + Util.hashCode(cpu) + Util.hashCode(io) + Util.hashCode(network); } - @Override - public boolean isInfinite() { + @Override + public boolean isInfinite() { return (this == INFINITY) || (this.cpu == Double.POSITIVE_INFINITY) || (this.io == Double.POSITIVE_INFINITY) || (this.network == Double.POSITIVE_INFINITY) || (this.rowCount == Double.POSITIVE_INFINITY); - } - - @Override - public boolean equals(RelOptCost other) { - // here we compare the individual components similar to VolcanoCost, however - // an alternative would be to add up the components and compare the total. - // Note that VolcanoPlanner mainly uses isLe() and isLt() for cost comparisons, - // not equals(). + } + + @Override + public boolean equals(RelOptCost other) { + // here we compare the individual components similar to VolcanoCost, however + // an alternative would be to add up the components and compare the total. + // Note that VolcanoPlanner mainly uses isLe() and isLt() for cost comparisons, + // not equals(). return this == other || (other instanceof DrillCostBase && (this.cpu == ((DrillCostBase) other).cpu) && (this.io == ((DrillCostBase) other).io) && (this.network == ((DrillCostBase) other).network) && (this.rowCount == ((DrillCostBase) other).rowCount)); - } + } - @Override - public boolean isEqWithEpsilon(RelOptCost other) { + @Override + public boolean isEqWithEpsilon(RelOptCost other) { if (!(other instanceof DrillCostBase)) { return false; } @@ -193,7 +193,7 @@ public class DrillCostBase implements DrillRelOptCost { && (Math.abs(this.io - that.io) < RelOptUtil.EPSILON) && (Math.abs(this.network - that.network) < RelOptUtil.EPSILON) && (Math.abs(this.rowCount - that.rowCount) < RelOptUtil.EPSILON)); - } + } @Override public boolean isLe(RelOptCost other) { @@ -216,8 +216,8 @@ public class DrillCostBase implements DrillRelOptCost { ); } - @Override - public RelOptCost plus(RelOptCost other) { + @Override + public RelOptCost plus(RelOptCost other) { DrillCostBase that = (DrillCostBase) other; if ((this == INFINITY) || (that == INFINITY)) { return INFINITY; @@ -228,10 +228,10 @@ public class DrillCostBase implements DrillRelOptCost { this.io + that.io, this.network + that.network, this.memory + that.memory); - } + } - @Override - public RelOptCost minus(RelOptCost other) { + @Override + public RelOptCost minus(RelOptCost other) { if (this == INFINITY) { return this; } @@ -242,18 +242,18 @@ public class DrillCostBase implements DrillRelOptCost { this.io - that.io, this.network - that.network, this.memory - that.memory); - } + } - @Override - public RelOptCost multiplyBy(double factor) { + @Override + public RelOptCost multiplyBy(double factor) { if (this == INFINITY) { return this; } return new DrillCostBase(rowCount * factor, cpu * factor, io * factor, network * factor); - } + } - @Override - public double divideBy(RelOptCost cost) { + @Override + public double divideBy(RelOptCost cost) { // Compute the geometric average of the ratios of all of the factors // which are non-zero and finite. DrillCostBase that = (DrillCostBase) cost; @@ -292,7 +292,7 @@ public class DrillCostBase implements DrillRelOptCost { return 1.0; } return Math.pow(d, 1 / n); - } + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java index 88e4e28..73c6c72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java @@ -22,7 +22,6 @@ import org.eigenbase.relopt.RelOptCost; public interface DrillRelOptCost extends RelOptCost { - double getNetwork(); + double getNetwork(); } - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index 47d6f14..e527960 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -26,7 +26,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import com.google.common.collect.Lists; -public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{ +public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class); private PhysicalOperator root; @@ -34,19 +34,21 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{ private final List<ExchangeFragmentPair> receivingExchangePairs = Lists.newLinkedList(); private Stats stats = new Stats(); - public void addOperator(PhysicalOperator o){ - if(root == null){ + public void addOperator(PhysicalOperator o) { + if (root == null) { root = o; } } public void addSendExchange(Exchange e) throws FragmentSetupException{ - if(sendingExchange != null) throw new FragmentSetupException("Fragment was trying to add a second SendExchange. "); + if (sendingExchange != null) { + throw new FragmentSetupException("Fragment was trying to add a second SendExchange. "); + } addOperator(e); sendingExchange = e; } - public void addReceiveExchange(Exchange e, Fragment fragment){ + public void addReceiveExchange(Exchange e, Fragment fragment) { this.receivingExchangePairs.add(new ExchangeFragmentPair(e, fragment)); } @@ -67,28 +69,32 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{ return sendingExchange; } -// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra){ +// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra) { // return visitor.visit(this, extra); // } - public Stats getStats(){ + public Stats getStats() { return stats; } public class ExchangeFragmentPair { private Exchange exchange; private Fragment node; + public ExchangeFragmentPair(Exchange exchange, Fragment node) { super(); this.exchange = exchange; this.node = node; } + public Exchange getExchange() { return exchange; } + public Fragment getNode() { return node; } + @Override public int hashCode() { final int prime = 31; @@ -97,13 +103,12 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{ result = prime * result + ((node == null) ? 0 : node.hashCode()); return result; } + @Override public String toString() { return "ExchangeFragmentPair [exchange=" + exchange + "]"; } - - } @Override @@ -119,22 +124,44 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{ @Override public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null) return false; - if (getClass() != obj.getClass()) return false; + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } Fragment other = (Fragment) obj; if (receivingExchangePairs == null) { - if (other.receivingExchangePairs != null) return false; - } else if (!receivingExchangePairs.equals(other.receivingExchangePairs)) return false; + if (other.receivingExchangePairs != null) { + return false; + } + } else if (!receivingExchangePairs.equals(other.receivingExchangePairs)) { + return false; + } if (root == null) { - if (other.root != null) return false; - } else if (!root.equals(other.root)) return false; + if (other.root != null) { + return false; + } + } else if (!root.equals(other.root)) { + return false; + } if (sendingExchange == null) { - if (other.sendingExchange != null) return false; - } else if (!sendingExchange.equals(other.sendingExchange)) return false; + if (other.sendingExchange != null) { + return false; + } + } else if (!sendingExchange.equals(other.sendingExchange)) { + return false; + } if (stats == null) { - if (other.stats != null) return false; - } else if (!stats.equals(other.stats)) return false; + if (other.stats != null) { + return false; + } + } else if (!stats.equals(other.stats)) { + return false; + } return true; } @@ -144,6 +171,4 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair>{ + receivingExchangePairs + ", stats=" + stats + "]"; } - - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java index 690fe45..594356a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java @@ -30,13 +30,15 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MakeFragmentsVisitor.class); - public MakeFragmentsVisitor(){ + public MakeFragmentsVisitor() { } @Override public Fragment visitExchange(Exchange exchange, Fragment value) throws FragmentSetupException { // logger.debug("Visiting Exchange {}", exchange); - if(value == null) throw new FragmentSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan."); + if (value == null) { + throw new FragmentSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan."); + } Fragment next = getNextBuilder(); value.addReceiveExchange(exchange, next); next.addSendExchange(exchange); @@ -55,21 +57,21 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag // logger.debug("Visiting Other {}", op); value = ensureBuilder(value); value.addOperator(op); - for(PhysicalOperator child : op){ + for (PhysicalOperator child : op) { child.accept(this, value); } return value; } private Fragment ensureBuilder(Fragment value) throws FragmentSetupException{ - if(value != null){ + if (value != null) { return value; - }else{ + } else { return getNextBuilder(); } } - public Fragment getNextBuilder(){ + public Fragment getNextBuilder() { return new Fragment(); }