http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 91d2037..195d249 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -21,8 +21,6 @@ import java.io.IOException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -35,12 +33,10 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; -import org.apache.drill.exec.expr.fn.impl.BitFunctions; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; @@ -48,9 +44,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; -import com.google.common.collect.ImmutableList; import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; @@ -58,40 +52,40 @@ import com.sun.codemodel.JExpr; public class ChainedHashTable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class); - private static final GeneratorMapping KEY_MATCH_BUILD = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, + private static final GeneratorMapping KEY_MATCH_BUILD = + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping KEY_MATCH_PROBE = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, + private static final GeneratorMapping KEY_MATCH_PROBE = + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping GET_HASH_BUILD = - GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, + private static final GeneratorMapping GET_HASH_BUILD = + GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping GET_HASH_PROBE = - GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, + private static final GeneratorMapping GET_HASH_PROBE = + GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping SET_VALUE = - GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, + private static final GeneratorMapping SET_VALUE = + GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, null /* reset */, null /* cleanup */); - - private static final GeneratorMapping OUTPUT_KEYS = + + private static final GeneratorMapping OUTPUT_KEYS = GeneratorMapping.create("setupInterior" /* setup method */, "outputRecordKeys" /* eval method */, null /* reset */, null /* cleanup */) ; // GM for putting constant expression into method "setupInterior" - private static final GeneratorMapping SETUP_INTERIOR_CONSTANT = - GeneratorMapping.create("setupInterior" /* setup method */, "setupInterior" /* eval method */, + private static final GeneratorMapping SETUP_INTERIOR_CONSTANT = + GeneratorMapping.create("setupInterior" /* setup method */, "setupInterior" /* eval method */, null /* reset */, null /* cleanup */); - // GM for putting constant expression into method "doSetup" - private static final GeneratorMapping DO_SETUP_CONSTANT = - GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, + // GM for putting constant expression into method "doSetup" + private static final GeneratorMapping DO_SETUP_CONSTANT = + GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, null /* reset */, null /* cleanup */); - + private final MappingSet KeyMatchIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD); private final MappingSet KeyMatchIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_PROBE); private final MappingSet KeyMatchHtableMapping = new MappingSet("htRowIdx", null, "htContainer", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD); @@ -109,10 +103,10 @@ public class ChainedHashTable { private final RecordBatch incomingProbe; private final RecordBatch outgoing; - public ChainedHashTable(HashTableConfig htConfig, + public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, + RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing) { @@ -135,31 +129,31 @@ public class ChainedHashTable { if (isProbe) { keyExprsProbe = new LogicalExpression[htConfig.getKeyExprsProbe().length]; } - + ErrorCollector collector = new ErrorCollectorImpl(); VectorContainer htContainerOrig = new VectorContainer(); // original ht container from which others may be cloned LogicalExpression[] htKeyExprs = new LogicalExpression[htConfig.getKeyExprsBuild().length]; TypedFieldId[] htKeyFieldIds = new TypedFieldId[htConfig.getKeyExprsBuild().length]; int i = 0; - for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + for (NamedExpression ne : htConfig.getKeyExprsBuild()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry()); if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); if (expr == null) continue; keyExprsBuild[i] = expr; - + final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); // create a type-specific ValueVector for this key ValueVector vv = TypeHelper.getNewVector(outputField, allocator); vv.allocateNew(); htKeyFieldIds[i] = htContainerOrig.add(vv); - + i++; } if (isProbe) { i = 0; - for (NamedExpression ne : htConfig.getKeyExprsProbe()) { + for (NamedExpression ne : htConfig.getKeyExprsProbe()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry()); if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); if (expr == null) continue; @@ -189,9 +183,9 @@ public class ChainedHashTable { return ht; } - - private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, + + private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { cg.setMappingSet(incomingMapping); @@ -202,10 +196,10 @@ public class ChainedHashTable { } int i = 0; - for (LogicalExpression expr : keyExprs) { + for (LogicalExpression expr : keyExprs) { cg.setMappingSet(incomingMapping); HoldingContainer left = cg.addExpr(expr, false); - + cg.setMappingSet(htableMapping); ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i++]); HoldingContainer right = cg.addExpr(vvrExpr, false); @@ -216,7 +210,7 @@ public class ChainedHashTable { // check if two values are not equal (comparator result != 0) JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - + jc._then()._return(JExpr.FALSE); } @@ -224,7 +218,7 @@ public class ChainedHashTable { cg.getEvalBlock()._return(JExpr.TRUE); } - private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) + private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { cg.setMappingSet(SetValueMapping); @@ -234,7 +228,7 @@ public class ChainedHashTable { ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, true) ; HoldingContainer hc = cg.addExpr(vvwExpr, false); // this will write to the htContainer at htRowIdx - cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); } cg.getEvalBlock()._return(JExpr.TRUE); @@ -267,22 +261,22 @@ public class ChainedHashTable { cg.getEvalBlock()._return(JExpr.lit(0)); return; } - + HoldingContainer combinedHashValue = null; for (int i = 0; i < keyExprs.length; i++) { LogicalExpression expr = keyExprs[i]; - + cg.setMappingSet(incomingMapping); HoldingContainer input = cg.addExpr(expr, false); // compute the hash(expr) - LogicalExpression hashfunc = FunctionGenerationHelper.getFunctionExpression("hash", Types.required(MinorType.INT), context.getFunctionRegistry(), input); + LogicalExpression hashfunc = FunctionGenerationHelper.getFunctionExpression("hash", Types.required(MinorType.INT), context.getFunctionRegistry(), input); HoldingContainer hashValue = cg.addExpr(hashfunc, false); if (i == 0) { - combinedHashValue = hashValue; // first expression..just use the hash value - } + combinedHashValue = hashValue; // first expression..just use the hash value + } else { // compute the combined hash value using XOR
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index d9499c7..6028a04 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -17,10 +17,7 @@ */ package org.apache.drill.exec.physical.impl.common; -import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.compile.TemplateClassDefinition; -import org.apache.drill.exec.expr.holders.BitHolder; -import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java index 98892c0..fa6d4b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java @@ -34,8 +34,8 @@ public class HashTableConfig { private final NamedExpression[] keyExprsProbe; @JsonCreator - public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor, - @JsonProperty("keyExprsBuild") NamedExpression[] keyExprsBuild, + public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor, + @JsonProperty("keyExprsBuild") NamedExpression[] keyExprsBuild, @JsonProperty("keyExprsProbe") NamedExpression[] keyExprsProbe) { this.initialCapacity = initialCapacity; this.loadFactor = loadFactor; @@ -47,15 +47,15 @@ public class HashTableConfig { return initialCapacity; } - public float getLoadFactor() { + public float getLoadFactor() { return loadFactor; } - public NamedExpression[] getKeyExprsBuild() { + public NamedExpression[] getKeyExprsBuild() { return keyExprsBuild; } - public NamedExpression[] getKeyExprsProbe() { + public NamedExpression[] getKeyExprsProbe() { return keyExprsProbe; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index e6c55bd..b03880c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -23,18 +23,17 @@ import java.util.Iterator; import javax.inject.Named; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.Types; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.compile.sig.RuntimeOverridden; +import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.compile.sig.RuntimeOverridden; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java index dd0e263..dc0abd9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java @@ -21,10 +21,10 @@ import org.apache.drill.exec.record.selection.SelectionVector2; public class EvaluationPredicate { private SelectionVector2 vector; - + EvaluationPredicate(String pred){ - + } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java index c5c81c6..7f2fe8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.filter; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -36,6 +35,6 @@ public class FilterBatchCreator implements BatchCreator<Filter>{ Preconditions.checkArgument(children.size() == 1); return new FilterRecordBatch(config, children.iterator().next(), context); } - - + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 3ece98b..bf00194 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -40,7 +40,6 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import com.google.common.collect.Lists; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java index 39519b4..74a5d16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java @@ -24,8 +24,8 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; public interface FilterSignature extends CodeGeneratorSignature{ - + public void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index 5b0b88f..bd85418 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; public abstract class FilterTemplate2 implements Filterer{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class); - + private SelectionVector2 outgoingSelectionVector; private SelectionVector2 incomingSelectionVector; private SelectionVectorMode svMode; @@ -39,7 +39,7 @@ public abstract class FilterTemplate2 implements Filterer{ this.transfers = transfers; this.outgoingSelectionVector = outgoing.getSelectionVector2(); this.svMode = incoming.getSchema().getSelectionVectorMode(); - + switch(svMode){ case NONE: break; @@ -58,7 +58,7 @@ public abstract class FilterTemplate2 implements Filterer{ t.transfer(); } } - + public void filterBatch(int recordCount){ if (! outgoingSelectionVector.allocateNew(recordCount)) { throw new UnsupportedOperationException("Unable to allocate filter batch"); @@ -75,7 +75,7 @@ public abstract class FilterTemplate2 implements Filterer{ } doTransfers(); } - + private void filterBatchSV2(int recordCount){ int svIndex = 0; final int count = recordCount; @@ -99,7 +99,7 @@ public abstract class FilterTemplate2 implements Filterer{ } outgoingSelectionVector.setRecordCount(svIndex); } - + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java index cfb1f5b..a1769b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java @@ -17,15 +17,13 @@ */ package org.apache.drill.exec.physical.impl.filter; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.allocator.VectorAllocator; - -import javax.inject.Named; public abstract class FilterTemplate4 implements Filterer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java index 8e8cb2e..fd7a13f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java @@ -25,10 +25,10 @@ import org.apache.drill.exec.record.TransferPair; public interface Filterer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class); - + public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; public void filterBatch(int recordCount); - + public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java index 1aa3006..c81fb2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java @@ -22,8 +22,8 @@ import java.util.Iterator; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.visitors.ExprVisitor; -import org.apache.drill.common.types.Types; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.Types; import com.google.common.collect.Iterators; @@ -32,11 +32,11 @@ public class ReturnValueExpression implements LogicalExpression{ private LogicalExpression child; private boolean returnTrueOnOne; - + public ReturnValueExpression(LogicalExpression child) { this(child, true); } - + public ReturnValueExpression(LogicalExpression child, boolean returnTrueOnOne) { this.child = child; this.returnTrueOnOne = returnTrueOnOne; @@ -60,7 +60,7 @@ public class ReturnValueExpression implements LogicalExpression{ public ExpressionPosition getPosition() { return ExpressionPosition.UNKNOWN; } - + @Override public Iterator<LogicalExpression> iterator() { return Iterators.singletonIterator(child); @@ -70,12 +70,12 @@ public class ReturnValueExpression implements LogicalExpression{ return returnTrueOnOne; } - public int getSelfCost() { - throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getSelfCost().", this.getClass().getCanonicalName())); + public int getSelfCost() { + throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getSelfCost().", this.getClass().getCanonicalName())); } - - public int getCumulativeCost() { - throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getCumulativeCost().", this.getClass().getCanonicalName())); + + public int getCumulativeCost() { + throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getCumulativeCost().", this.getClass().getCanonicalName())); } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java index d925958..bfe89c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.physical.impl.join; -import com.google.common.base.Preconditions; +import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; @@ -25,7 +25,7 @@ import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.base.Preconditions; public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java index aec0f31..a3c33ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java @@ -18,17 +18,17 @@ package org.apache.drill.exec.physical.impl.join; +import io.netty.buffer.ByteBuf; + import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import io.netty.buffer.ByteBuf; - import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.record.selection.SelectionVector4; /* @@ -71,7 +71,7 @@ public class HashJoinHelper { public static final int LEFT_INPUT = 0; public static final int RIGHT_INPUT = 1; - + public HashJoinHelper(FragmentContext context, BufferAllocator allocator) { this.context = context; this.allocator = allocator; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index ae70339..7599f9e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -18,20 +18,17 @@ package org.apache.drill.exec.physical.impl.join; +import java.io.IOException; + import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.physical.impl.common.HashTableConfig; -import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorContainer; import org.eigenbase.rel.JoinRelType; -import java.io.IOException; - public interface HashJoinProbe { public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 0b90362..785deae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -17,23 +17,21 @@ */ package org.apache.drill.exec.physical.impl.join; +import java.io.IOException; +import java.util.List; + import javax.inject.Named; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.AbstractRecordBatch; - +import org.apache.drill.exec.record.VectorWrapper; import org.eigenbase.rel.JoinRelType; -import java.io.IOException; -import java.util.List; - public abstract class HashJoinProbeTemplate implements HashJoinProbe { // Probe side record batch http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index c704a8a..bb3b9ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.join; import javax.inject.Named; -import org.apache.drill.common.logical.data.Join; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java index 8643d66..e4e13d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java @@ -24,14 +24,14 @@ import org.apache.drill.exec.record.VectorContainer; public interface JoinWorker { - + public static enum JoinOutcome { NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE; } public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; public boolean doJoin(JoinStatus status); - + public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 7a6273c..b24b534 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -505,7 +505,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { HoldingContainer out = cg.addExpr(fh, false); // If not 0, it means not equal. We return this out value. - // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. + // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()) { JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)). cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java index 3549a33..d6b566c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java @@ -37,8 +37,8 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { if(config.getJoinType() == JoinRelType.RIGHT){ return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0)); }else{ - return new MergeJoinBatch(config, context, children.get(0), children.get(1)); + return new MergeJoinBatch(config, context, children.get(0), children.get(1)); } - + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java index ccbf755..e71daba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java @@ -17,14 +17,15 @@ */ package org.apache.drill.exec.physical.impl.limit; -import com.google.common.collect.Iterables; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.collect.Iterables; public class LimitBatchCreator implements BatchCreator<Limit> { @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 32eb709..f5bc9f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -23,13 +23,11 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Limit; -import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.selection.SelectionVector2; import com.google.common.collect.Lists; @@ -37,7 +35,7 @@ import com.google.common.collect.Lists; public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); - + private SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; private int recordsToSkip; @@ -96,7 +94,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { if (first) { return produceEmptyFirstBatch(); } - + incoming.kill(true); IterOutcome upStream = incoming.next(); @@ -159,7 +157,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { incoming.kill(true); return IterOutcome.OK_NEW_SCHEMA; } - + private void limitWithNoSV(int recordCount) { int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); recordsToSkip -= offset; @@ -211,5 +209,5 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { outgoingSv.clear(); super.cleanup(); } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java index 9d4a629..cef4101 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java @@ -17,27 +17,27 @@ */ package org.apache.drill.exec.physical.impl.materialize; +import io.netty.buffer.ByteBuf; + import java.util.Arrays; import java.util.List; -import com.google.common.collect.Lists; -import io.netty.buffer.ByteBuf; - import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.WritableBatch; + +import com.google.common.collect.Lists; public class QueryWritableBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class); - + private final QueryResult header; private final ByteBuf[] buffers; - - + + public QueryWritableBatch(QueryResult header, ByteBuf... buffers) { super(); this.header = header; @@ -55,7 +55,7 @@ public class QueryWritableBatch { } return n; } - + public QueryResult getHeader() { return header; } @@ -79,5 +79,5 @@ public class QueryWritableBatch { .build(); return new QueryWritableBatch(header); } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java index aa7f86e..221fc34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.materialize; public interface RecordMaterializer { - + public QueryWritableBatch convertNext(boolean isLast); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java index b4e4871..cc1b3bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java @@ -35,7 +35,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{ this.batch = batch; BatchSchema schema = batch.getSchema(); assert schema != null : "Schema must be defined."; - + // for (MaterializedField f : batch.getSchema()) { // logger.debug("New Field: {}", f); // } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java index 237f2f8..2885c52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java @@ -17,18 +17,16 @@ */ package org.apache.drill.exec.physical.impl.mergereceiver; +import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; + import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorAccessible; -import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; - public interface MergingReceiverGeneratorBase { - + public abstract void doSetup(FragmentContext context, VectorAccessible incoming, VectorAccessible outgoing) throws SchemaChangeException; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java index 002e054..c29ef75 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java @@ -17,14 +17,12 @@ */ package org.apache.drill.exec.physical.impl.mergereceiver; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorAccessible; -import javax.inject.Named; - public abstract class MergingReceiverTemplate implements MergingReceiverGeneratorBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverTemplate.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index b8e18af..cf2e36f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -18,13 +18,14 @@ package org.apache.drill.exec.physical.impl.mergereceiver; * limitations under the License. */ +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; -import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -68,7 +69,6 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.RelFieldCollation.Direction; import parquet.Preconditions; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java index 2265150..3213d11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; +import java.util.List; + import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; @@ -26,8 +28,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; -import java.util.List; - public interface OrderedPartitionProjector { public abstract void setup(FragmentContext context, VectorAccessible incoming, RecordBatch outgoing, List<TransferPair> transfers, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java index 3398443..f5068b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; -import com.google.common.collect.ImmutableList; +import java.util.List; + +import javax.inject.Named; + import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -30,8 +33,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.IntVector; -import javax.inject.Named; -import java.util.List; +import com.google.common.collect.ImmutableList; public abstract class OrderedPartitionProjectorTemplate implements OrderedPartitionProjector { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionProjectorTemplate.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java index 73fcd1f..1371e1c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java @@ -17,16 +17,16 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector4; -import javax.inject.Named; - public abstract class SampleCopierTemplate implements SampleCopier { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SampleCopierTemplate.class); - + private SelectionVector4 sv4; private int outputRecords = 0; @@ -41,7 +41,7 @@ public abstract class SampleCopierTemplate implements SampleCopier { public int getOutputRecords() { return outputRecords; } - + @Override public boolean copyRecords(int skip, int start, int total) { @@ -57,10 +57,10 @@ public abstract class SampleCopierTemplate implements SampleCopier { } return true; } - + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java index 5a83541..3d5c548 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; @@ -25,8 +27,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; -import javax.inject.Named; - public abstract class SampleSortTemplate implements SampleSorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SampleSortTemplate.class); @@ -36,7 +36,7 @@ public abstract class SampleSortTemplate implements SampleSorter, IndexedSortabl this.vector2 = vector2; doSetup(context, sampleBatch, null); } - + @Override public void sort(SelectionVector2 vector2, VectorContainer container){ QuickSort qs = new QuickSort(); @@ -49,7 +49,7 @@ public abstract class SampleSortTemplate implements SampleSorter, IndexedSortabl vector2.setIndex(sv0, vector2.getIndex(sv1)); vector2.setIndex(sv1, tmp); } - + @Override public int compare(int leftIndex, int rightIndex) { return doEval(leftIndex, rightIndex); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java index b58f15e..1eb5790 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java @@ -27,7 +27,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; public interface SampleSorter { public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException; public void sort(SelectionVector2 vector2, VectorContainer container); - + public static TemplateClassDefinition<SampleSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<SampleSorter>(SampleSorter.class, SampleSortTemplate.class); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java index 2ced9dd..06fd115 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.physical.impl.partitionsender; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootCreator; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; public class PartitionSenderCreator implements RootCreator<HashPartitionSender> { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 14cf092..6ff0418 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -66,10 +66,10 @@ public class PartitionSenderRootExec extends BaseRootExec { private final AtomicIntegerArray remainingReceivers; private final AtomicInteger remaingReceiverCount; private volatile boolean done = false; - + long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; - + public enum Metric implements MetricDef { BATCHES_SENT, RECORDS_SENT, @@ -249,7 +249,7 @@ public class PartitionSenderRootExec extends BaseRootExec { } } } - + public void stop() { logger.debug("Partition sender stopping."); ok = false; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index c5fe154..5ed9c39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.physical.impl.partitionsender; +import java.io.IOException; +import java.util.List; + import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -26,9 +29,6 @@ import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.record.RecordBatch; -import java.io.IOException; -import java.util.List; - public interface Partitioner { public abstract void setup(FragmentContext context, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 3141aed..338a704 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -324,7 +323,7 @@ public abstract class PartitionerTemplate implements Partitioner { throw new IOException(statusHandler.getException()); } } - + public void updateStats(FragmentWritableBatch writableBatch) { stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); stats.addLongStat(Metric.BATCHES_SENT, 1); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java index 7c3fd52..469140c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.partitionsender; import io.netty.buffer.ByteBuf; + import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.SendingAccountor; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 91d3647..051a590 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -17,8 +17,11 @@ */ package org.apache.drill.exec.physical.impl.producer; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -29,20 +32,11 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.vector.ValueVector; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; - public class ProducerConsumerBatch extends AbstractRecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java index 0fcf4f3..c568ed4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java @@ -17,16 +17,15 @@ */ package org.apache.drill.exec.physical.impl.producer; -import com.google.common.collect.Iterables; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.physical.impl.producer.ProducerConsumerBatch; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.collect.Iterables; public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> { @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 734088e..ec29cac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -54,7 +54,6 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.record.AbstractSingleRecordBatch; -import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -82,8 +81,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private int recordCount; private static final String EMPTY_STRING = ""; - - private class ClassifierResult { + + private class ClassifierResult { public boolean isStar = false; public List<String> outputNames; public String prefix = ""; @@ -91,13 +90,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ public CaseInsensitiveMap outputMap = new CaseInsensitiveMap(); private CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap(); - private void clear() { + private void clear() { isStar = false; prefix = ""; - if (outputNames != null) { + if (outputNames != null) { outputNames.clear(); } - + // note: don't clear the internal maps since they have cumulative data.. } } @@ -128,6 +127,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ return super.innerNext(); } + @Override public VectorContainer getOutgoingContainer() { return this.container; } @@ -262,17 +262,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ IntOpenHashSet transferFieldIds = new IntOpenHashSet(); boolean isAnyWildcard = false; - - ClassifierResult result = new ClassifierResult(); + + ClassifierResult result = new ClassifierResult(); boolean classify = isClassificationNeeded(exprs); - + for(int i = 0; i < exprs.size(); i++){ final NamedExpression namedExpression = exprs.get(i); result.clear(); - + if (classify && namedExpression.getExpr() instanceof SchemaPath) { classifyExpr(namedExpression, incoming, result); - + if (result.isStar) { isAnyWildcard = true; Integer value = result.prefixMap.get(result.prefix); @@ -284,12 +284,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ if (k > result.outputNames.size()-1) { assert false; } - String name = result.outputNames.get(k++); // get the renamed column names + String name = result.outputNames.get(k++); // get the renamed column names if (name == EMPTY_STRING) continue; FieldReference ref = new FieldReference(name); TransferPair tp = wrapper.getValueVector().getTransferPair(ref); transfers.add(tp); - container.add(tp.getTo()); + container.add(tp.getTo()); } } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors int k = 0; @@ -305,7 +305,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() ); if(collector.hasErrors()){ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); - } + } MaterializedField outputField = MaterializedField.create(name, expr.getMajorType()); ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -330,7 +330,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } } } - + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); if(collector.hasErrors()){ @@ -426,7 +426,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ NameSegment ref = ex.getRef().getRootSegment(); boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN); - + if (refHasPrefix || exprContainsStar) { needed = true; break; @@ -434,19 +434,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } return needed; } - + private String getUniqueName(String name, ClassifierResult result) { Integer currentSeq = (Integer) result.sequenceMap.get(name); if (currentSeq == null) { // name is unique, so return the original name Integer n = -1; result.sequenceMap.put(name, n); - return name; + return name; } // create a new name Integer newSeq = currentSeq + 1; - result.sequenceMap.put(name, newSeq); - - String newName = name + newSeq; + result.sequenceMap.put(name, newSeq); + + String newName = name + newSeq; return newName; } @@ -462,8 +462,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.outputNames.add(EMPTY_STRING); } } - - private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) { + + private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) { NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); NameSegment ref = ex.getRef().getRootSegment(); boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); @@ -475,11 +475,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ String exprPrefix = EMPTY_STRING; String exprSuffix = expr.getPath(); - + if (exprHasPrefix) { // get the prefix of the expr String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(exprComponents.length == 2); + assert(exprComponents.length == 2); exprPrefix = exprComponents[0]; exprSuffix = exprComponents[1]; result.prefix = exprPrefix; @@ -496,18 +496,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.prefixMap.put(exprPrefix, n); } } - + int incomingSchemaSize = incoming.getSchema().getFieldCount(); - // for debugging.. + // for debugging.. // if (incomingSchemaSize > 9) { // assert false; // } - + // input is '*' and output is 'prefix_*' - if (exprIsStar && refHasPrefix && refEndsWithStar) { + if (exprIsStar && refHasPrefix && refEndsWithStar) { String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(components.length == 2); + assert(components.length == 2); String prefix = components[0]; result.outputNames = Lists.newArrayList(); for(VectorWrapper<?> wrapper : incoming) { @@ -518,7 +518,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name; addToResultMaps(newName, result, false); } - } + } // input and output are the same else if (expr.getPath().equals(ref.getPath())) { if (exprContainsStar && exprHasPrefix) { @@ -533,16 +533,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ for(VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); - // get the prefix of the name + // get the prefix of the name String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2); // if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it if (nameComponents.length <= 1) { k++; continue; - } + } String namePrefix = nameComponents[0]; if (exprPrefix.equals(namePrefix)) { - String newName = incomingName; + String newName = incomingName; if (!result.outputMap.containsKey(newName)) { result.outputNames.set(k, newName); result.outputMap.put(newName, newName); @@ -593,13 +593,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ if (components.length <= 1) { k++; continue; - } + } String namePrefix = components[0]; String nameSuffix = components[1]; if (exprPrefix.equals(namePrefix)) { if (refContainsStar) { - // remove the prefix from the incoming column names - String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique + // remove the prefix from the incoming column names + String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique result.outputNames.set(k, newName); } else if (exprSuffix.equals(nameSuffix)) { // example: ref: $f1, expr: T0<PREFIX><column_name> @@ -611,15 +611,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } k++; } - } + } // input and output have prefixes although they could be different... else if (exprHasPrefix && refHasPrefix) { String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(input.length == 2); - assert false : "Unexpected project expression or reference"; // not handled yet - } + assert(input.length == 2); + assert false : "Unexpected project expression or reference"; // not handled yet + } else { - // if the incoming schema's column name matches the expression name of the Project, + // if the incoming schema's column name matches the expression name of the Project, // then we just want to pick the ref name as the output column name result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize); for (int j=0; j < incomingSchemaSize; j++) { @@ -629,10 +629,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ int k = 0; for(VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); - String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); + String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); if (expr.getPath().equals(incomingName)) { - String newName = ref.getPath(); + String newName = ref.getPath(); if (!result.outputMap.containsKey(newName)) { result.outputNames.set(k, newName); result.outputMap.put(newName, newName); @@ -640,7 +640,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } k++; } - } + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 02cad5a..8116869 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -35,11 +35,11 @@ import com.google.common.collect.Lists; */ public class RecordBatchData { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class); - + private SelectionVector2 sv2; private int recordCount; VectorContainer container = new VectorContainer(); - + public RecordBatchData(VectorAccessible batch){ List<ValueVector> vectors = Lists.newArrayList(); if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { @@ -47,7 +47,7 @@ public class RecordBatchData { } else { this.sv2 = null; } - + for(VectorWrapper<?> v : batch){ if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); TransferPair tp = v.getValueVector().getTransferPair(); @@ -66,7 +66,7 @@ public class RecordBatchData { container = VectorContainer.canonicalize(container); container.buildSchema(mode); } - + public int getRecordCount(){ return recordCount; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index dbb547d..3a37491 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -32,7 +32,6 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Sort; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java index cd5079f..217acf2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java @@ -35,6 +35,6 @@ public class SortBatchCreator implements BatchCreator<Sort>{ Preconditions.checkArgument(children.size() == 1); return new SortBatch(config, context, children.iterator().next()); } - - + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/39990292/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java index e3971cb..593db0d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java @@ -17,10 +17,10 @@ */ package org.apache.drill.exec.physical.impl.sort; +import java.util.concurrent.TimeUnit; + import javax.inject.Named; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; @@ -29,21 +29,22 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; public abstract class SortTemplate implements Sorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortTemplate.class); - + private SelectionVector4 vector4; - + public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. Preconditions.checkNotNull(vector4); this.vector4 = vector4; doSetup(context, hyperBatch, null); } - + @Override public void sort(SelectionVector4 vector4, VectorContainer container){ Stopwatch watch = new Stopwatch(); @@ -59,7 +60,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{ vector4.set(sv0, vector4.get(sv1)); vector4.set(sv1, tmp); } - + @Override public int compare(int leftIndex, int rightIndex) { int sv1 = vector4.get(leftIndex);