http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java index e9b3051..ace4f24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.filter; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class FilterBatchCreator implements BatchCreator<Filter>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class); - +public class FilterBatchCreator implements BatchCreator<Filter> { @Override - public FilterRecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) + public FilterRecordBatch getBatch(ExecutorFragmentContext context, Filter config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new FilterRecordBatch(config, children.iterator().next(), context); } - - }
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 1bdd097..f0b832a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -42,9 +42,7 @@ import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; -public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class); - +public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> { private SelectionVector2 sv2; private SelectionVector4 sv4; private Filterer filter; @@ -120,16 +118,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ * logic that handles SV4 + filter should always be pushed beyond sort so disabling * it in FilterPrel. * - - // set up the multi-batch selection vector - this.svAllocator = oContext.getAllocator().getNewPreAllocator(); - if (!svAllocator.preAllocate(incoming.getRecordCount()*4)) - throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" + - incoming.getRecordCount() * 4 + " bytes)"); - sv4 = new SelectionVector4(svAllocator.getAllocation(), incoming.getRecordCount(), Character.MAX_VALUE); - this.filter = generateSV4Filterer(); - break; - */ + */ default: throw new UnsupportedOperationException(); } @@ -197,8 +186,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); CodeGenerator<Filterer> codeGen = cg.getCodeGenerator(); codeGen.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); final Filterer filter = context.getImplementationClass(codeGen); filter.setup(context, incoming, this, tx); return filter; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java index 74a5d16..db62d36 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java @@ -20,12 +20,12 @@ package org.apache.drill.exec.physical.impl.filter; import javax.inject.Named; import org.apache.drill.exec.compile.sig.CodeGeneratorSignature; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.record.RecordBatch; public interface FilterSignature extends CodeGeneratorSignature{ - public void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); + public void doSetup(@Named("context") FragmentContextImpl context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index d014a2e..52533bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -27,9 +27,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector2; -public abstract class FilterTemplate2 implements Filterer{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class); - +public abstract class FilterTemplate2 implements Filterer { private SelectionVector2 outgoingSelectionVector; private SelectionVector2 incomingSelectionVector; private SelectionVectorMode svMode; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java index fd1f9e6..4850cff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java @@ -26,8 +26,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector4; public abstract class FilterTemplate4 implements Filterer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class); - private SelectionVector4 outgoingSelectionVector; private SelectionVector4 incomingSelectionVector; private TransferPair[] transfers; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java index aa45f54..a3d03c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java @@ -24,11 +24,9 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; public interface Filterer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class); + TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); + TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); - public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; - public void filterBatch(int recordCount) throws SchemaChangeException; - - public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); - public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); + void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; + void filterBatch(int recordCount) throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java index 94203d8..bfda4f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java @@ -20,21 +20,18 @@ package org.apache.drill.exec.physical.impl.flatten; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class); - +public class FlattenBatchCreator implements BatchCreator<FlattenPOP> { @Override - public FlattenRecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children) + public FlattenRecordBatch getBatch(ExecutorFragmentContext context, FlattenPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new FlattenRecordBatch(config, children.iterator().next(), context); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 2aa841b..8be16ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -303,8 +303,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); final IntHashSet transferFieldIds = new IntHashSet(); final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java index 5293060..392757e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java @@ -27,9 +27,11 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.vector.complex.RepeatedValueVector; public interface Flattener { - public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException; + TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class); - public interface Monitor { + void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException; + + interface Monitor { /** * Get the required buffer size for the specified number of records. * {@see ValueVector#getBufferSizeFor(int)} for the meaning of this. @@ -37,14 +39,14 @@ public interface Flattener { * @param recordCount the number of records processed so far * @return the buffer size the vectors report as being in use */ - public int getBufferSizeFor(int recordCount); - }; + int getBufferSizeFor(int recordCount); + } + + int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor); - public int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor); + void setFlattenField(RepeatedValueVector repeatedColumn); - public void setFlattenField(RepeatedValueVector repeatedColumn); - public RepeatedValueVector getFlattenField(); - public void resetGroupIndex(); + RepeatedValueVector getFlattenField(); - public static final TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class); + void resetGroupIndex(); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 7b679c0..e087bc8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -256,12 +256,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // No more output records, clean up and return state = BatchState.DONE; - // if (first) { - // return IterOutcome.OK_NEW_SCHEMA; - // } return IterOutcome.NONE; } catch (ClassTransformationException | SchemaChangeException | IOException e) { - context.fail(e); + context.getExecutorState().fail(e); killIncoming(false); return IterOutcome.STOP; } @@ -405,8 +402,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.saveCodeForDebugging(true); final ClassGenerator<HashJoinProbe> g = cg.getRoot(); // Generate the code to project build side records http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java index 1402769..a005559 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -30,7 +30,7 @@ import com.google.common.base.Preconditions; public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> { @Override - public HashJoinBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) + public HashJoinBatch getBatch(ExecutorFragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 2); return new HashJoinBatch(config, context, children.get(0), children.get(1)); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index cc6bd55..4ef28e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -30,7 +30,7 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.calcite.rel.core.JoinRelType; public interface HashJoinProbe { - public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); + TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); /* The probe side of the hash join can be in the following two states * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a @@ -40,15 +40,15 @@ public interface HashJoinProbe { * case we handle it internally by projecting the record if there isn't a match on the build side * 3. DONE: Once we have projected all possible records we are done */ - public static enum ProbeState { + enum ProbeState { PROBE_PROJECT, PROJECT_RIGHT, DONE } - public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, - JoinRelType joinRelType); - public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); - public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; - public abstract void projectBuildRecord(int buildIndex, int outIndex); - public abstract void projectProbeRecord(int probeIndex, int outIndex); + void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, + int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, + JoinRelType joinRelType); + void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); + int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; + void projectBuildRecord(int buildIndex, int outIndex); + void projectProbeRecord(int probeIndex, int outIndex); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java index 55322f8..95f7c3d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java @@ -24,15 +24,12 @@ import org.apache.drill.exec.record.VectorContainer; public interface JoinWorker { + TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class); - public static enum JoinOutcome { + enum JoinOutcome { NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE; } - public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; - public boolean doJoin(JoinStatus status); - - public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>( - JoinWorker.class, JoinTemplate.class); - + void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; + boolean doJoin(JoinStatus status); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 8ad3f84..1ed4722 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -199,7 +199,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { this.worker = generateNewWorker(); first = true; } catch (ClassTransformationException | IOException | SchemaChangeException e) { - context.fail(new SchemaChangeException(e)); + context.getExecutorState().fail(new SchemaChangeException(e)); kill(false); return IterOutcome.STOP; } finally { @@ -269,12 +269,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { right.kill(sendUpstream); } - private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{ - + private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException { final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); final ErrorCollector collector = new ErrorCollectorImpl(); // Generate members and initialization code http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java index 24f5533..b24624e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,10 +29,8 @@ import org.apache.calcite.rel.core.JoinRelType; import com.google.common.base.Preconditions; public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class); - @Override - public MergeJoinBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) + public MergeJoinBatch getBatch(ExecutorFragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 2); if(config.getJoinType() == JoinRelType.RIGHT){ @@ -40,6 +38,5 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { }else{ return new MergeJoinBatch(config, context, children.get(0), children.get(1)); } - } } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java index 2e708a6..ef4cab1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java @@ -20,14 +20,14 @@ package org.apache.drill.exec.physical.impl.join; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; public class NestedLoopJoinBatchCreator implements BatchCreator<NestedLoopJoinPOP> { @Override - public NestedLoopJoinBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) + public NestedLoopJoinBatch getBatch(ExecutorFragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { return new NestedLoopJoinBatch(config, context, children.get(0), children.get(1)); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java index f954e72..15e5275 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.limit; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,7 +29,7 @@ import com.google.common.collect.Iterables; public class LimitBatchCreator implements BatchCreator<Limit> { @Override - public LimitRecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) + public LimitRecordBatch getBatch(ExecutorFragmentContext context, Limit config, List<RecordBatch> children) throws ExecutionSetupException { return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children)); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index f9ceff2..7e5ff21 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.MinorFragmentEndpoint; @@ -96,7 +97,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private RecordBatchLoader[] batchLoaders; private final RawFragmentBatchProvider[] fragProviders; - private final FragmentContext context; + private final ExchangeFragmentContext context; private VectorContainer outgoingContainer; private MergingReceiverGeneratorBase merger; private final MergingReceiverPOP config; @@ -109,12 +110,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private RawFragmentBatch[] incomingBatches; private int[] batchOffsets; private PriorityQueue <Node> pqueue; - private RawFragmentBatch emptyBatch = null; private RawFragmentBatch[] tempBatchHolder; private long[] inputCounts; private long[] outputCounts; - public static enum Metric implements MetricDef{ + public enum Metric implements MetricDef { BYTES_RECEIVED, NUM_SENDERS, NEXT_WAIT_NANOS; @@ -125,7 +125,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } - public MergingRecordBatch(final FragmentContext context, + public MergingRecordBatch(final ExchangeFragmentContext context, final MergingReceiverPOP config, final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException { super(config, context, true, context.newOperatorContext(config)); @@ -210,11 +210,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> try { rawBatch = getNext(p); } catch (final IOException e) { - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } } - if (rawBatch == null && !context.shouldContinue()) { + if (rawBatch == null && !context.getExecutorState().shouldContinue()) { clearBatches(rawBatches); return IterOutcome.STOP; } @@ -241,12 +241,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { // Do nothing } - if (rawBatch == null && !context.shouldContinue()) { + if (rawBatch == null && !context.getExecutorState().shouldContinue()) { clearBatches(rawBatches); return IterOutcome.STOP; } } catch (final IOException e) { - context.fail(e); + context.getExecutorState().fail(e); clearBatches(rawBatches); return IterOutcome.STOP; } @@ -315,7 +315,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // SchemaChangeException, so check/clean catch clause below. } catch(final SchemaChangeException e) { logger.error("MergingReceiver failed to load record batch from remote host. {}", e); - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } batch.release(); @@ -328,7 +328,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // Ensure all the incoming batches have the identical schema. // Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches. if (!isSameSchemaAmongBatches(batchLoaders)) { - context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!")); + context.getExecutorState().fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!")); return IterOutcome.STOP; } @@ -351,7 +351,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> merger = createMerger(); } catch (final SchemaChangeException e) { logger.error("Failed to generate code for MergingReceiver. {}", e); - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } @@ -380,12 +380,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } else { batchLoaders[b].clear(); batchLoaders[b] = null; - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } } } catch (IOException | SchemaChangeException e) { - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } } @@ -418,11 +418,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId] : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); - if (nextBatch == null && !context.shouldContinue()) { + if (nextBatch == null && !context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } } catch (final IOException e) { - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } @@ -456,7 +456,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // TODO: Clean: DRILL-2933: That load(...) no longer throws // SchemaChangeException, so check/clean catch clause below. } catch(final SchemaChangeException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } incomingBatches[node.batchId].release(); @@ -548,7 +548,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } final RawFragmentBatch batch = getNext(i); if (batch == null) { - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { state = BatchState.STOP; } else { state = BatchState.DONE; @@ -605,7 +605,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> .setReceiver(context.getHandle()) .setSender(sender) .build(); - context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver); + context.getController() + .getTunnel(providingEndpoint.getEndpoint()) + .informReceiverFinished(new OutcomeListener(), finishedReceiver); } } @@ -624,10 +626,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> @Override public void interrupted(final InterruptedException e) { - if (context.shouldContinue()) { + if (context.getExecutorState().shouldContinue()) { final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message"; logger.error(errMsg, e); - context.fail(new RpcException(errMsg, e)); + context.getExecutorState().fail(new RpcException(errMsg, e)); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 7f662ae..9e82af8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -196,7 +196,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } } VectorContainer sortedSamples = new VectorContainer(); - builder.build(context, sortedSamples); + builder.build(sortedSamples); // Sort the records according the orderings given in the configuration @@ -262,7 +262,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart Thread.sleep(timeout); return true; } catch (final InterruptedException e) { - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return false; } } @@ -329,7 +329,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (final ClassTransformationException | IOException | SchemaChangeException ex) { kill(false); - context.fail(ex); + context.getExecutorState().fail(ex); return false; // TODO InterruptedException } @@ -349,7 +349,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart for (CachedVectorContainer w : mmap.get(mapKey)) { containerBuilder.add(w.get()); } - containerBuilder.build(context, allSamplesContainer); + containerBuilder.build(allSamplesContainer); List<Ordering> orderDefs = Lists.newArrayList(); int i = 0; @@ -390,7 +390,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart candidatePartitionTable.setRecordCount(copier.getOutputRecords()); @SuppressWarnings("resource") WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false); - wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator()); + wrap = new CachedVectorContainer(batch, context.getAllocator()); tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); } finally { candidatePartitionTable.clear(); @@ -486,7 +486,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (SchemaChangeException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } doWork(vc); @@ -519,7 +519,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (SchemaChangeException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } doWork(vc); @@ -550,7 +550,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (SchemaChangeException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } // fall through. http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java index d2e07e7..5705aca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.orderedpartitioner; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.impl.RootCreator; @@ -61,7 +61,7 @@ public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartiti @SuppressWarnings("resource") @Override - public RootExec getRoot(FragmentContext context, OrderedPartitionSender config, + public RootExec getRoot(ExecutorFragmentContext context, OrderedPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java index 06fd115..e0b7b9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootCreator; import org.apache.drill.exec.physical.impl.RootExec; @@ -29,7 +29,7 @@ import org.apache.drill.exec.record.RecordBatch; public class PartitionSenderCreator implements RootCreator<HashPartitionSender> { @Override - public RootExec getRoot(FragmentContext context, + public RootExec getRoot(ExecutorFragmentContext context, HashPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 108d539..25be50a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -33,9 +33,10 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; @@ -63,7 +64,7 @@ public class PartitionSenderRootExec extends BaseRootExec { private HashPartitionSender operator; private PartitionerDecorator partitioner; - private FragmentContext context; + private ExchangeFragmentContext context; private boolean ok = true; private final int outGoingBatchCount; private final HashPartitionSender popConfig; @@ -98,13 +99,13 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public PartitionSenderRootExec(FragmentContext context, + public PartitionSenderRootExec(RootFragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { this(context, incoming, operator, false); } - public PartitionSenderRootExec(FragmentContext context, + public PartitionSenderRootExec(RootFragmentContext context, RecordBatch incoming, HashPartitionSender operator, boolean closeIncoming) throws OutOfMemoryException { @@ -173,7 +174,7 @@ public class PartitionSenderRootExec extends BaseRootExec { } catch (IOException e) { incoming.kill(false); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); - context.fail(e); + context.getExecutorState().fail(e); } return false; @@ -203,19 +204,19 @@ public class PartitionSenderRootExec extends BaseRootExec { } catch (IOException e) { incoming.kill(false); logger.error("Error while flushing outgoing batches", e); - context.fail(e); + context.getExecutorState().fail(e); return false; } catch (SchemaChangeException e) { incoming.kill(false); logger.error("Error while setting up partitioner", e); - context.fail(e); + context.getExecutorState().fail(e); return false; } case OK: try { partitioner.partitionBatch(incoming); } catch (IOException e) { - context.fail(e); + context.getExecutorState().fail(e); incoming.kill(false); return false; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 95a4813..5d1b08c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -22,34 +22,33 @@ import java.util.List; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.record.RecordBatch; public interface Partitioner { + void setup(ExchangeFragmentContext context, + RecordBatch incoming, + HashPartitionSender popConfig, + OperatorStats stats, + OperatorContext oContext, + int start, int count) throws SchemaChangeException; - public abstract void setup(FragmentContext context, - RecordBatch incoming, - HashPartitionSender popConfig, - OperatorStats stats, - OperatorContext oContext, - int start, int count) throws SchemaChangeException; - - public abstract void partitionBatch(RecordBatch incoming) throws IOException; - public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; - public abstract void initialize(); - public abstract void clear(); - public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches(); + void partitionBatch(RecordBatch incoming) throws IOException; + void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; + void initialize(); + void clear(); + List<? extends PartitionOutgoingBatch> getOutgoingBatches(); /** * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner * @param index * @return PartitionOutgoingBatch that matches index within Partitioner. This method can * return null if index does not fall within boundary of this Partitioner */ - public abstract PartitionOutgoingBatch getOutgoingBatch(int index); - public abstract OperatorStats getStats(); + PartitionOutgoingBatch getOutgoingBatch(int index); + OperatorStats getStats(); - public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); + TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java index 042222a..78b8d03 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java @@ -57,7 +57,7 @@ public class PartitionerDecorator { this.partitioners = partitioners; this.stats = stats; this.context = context; - this.executor = context.getDrillbitContext().getExecutor(); + this.executor = context.getExecutor(); this.tName = Thread.currentThread().getName(); this.childThreadPrefix = "Partitioner-" + tName + "-"; } @@ -177,7 +177,7 @@ public class PartitionerDecorator { break; } catch (final InterruptedException e) { // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { logger.debug("Interrupting partioner threads. Fragment thread {}", tName); for(Future<?> f : taskFutures) { f.cancel(true); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index aa72c44..0d52b53 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; @@ -81,7 +82,7 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override - public final void setup(FragmentContext context, + public final void setup(ExchangeFragmentContext context, RecordBatch incoming, HashPartitionSender popConfig, OperatorStats stats, http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 3afa852..bbcb758 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -68,8 +68,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> wrapper = queue.take(); logger.debug("Got batch from queue"); } catch (final InterruptedException e) { - if (context.shouldContinue()) { - context.fail(e); + if (context.getExecutorState().shouldContinue()) { + context.getExecutorState().fail(e); } return IterOutcome.STOP; // TODO InterruptedException http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java index 6542576..779728a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.producer; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,7 +29,7 @@ import com.google.common.collect.Iterables; public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> { @Override - public ProducerConsumerBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children) + public ProducerConsumerBatch getBatch(ExecutorFragmentContext context, ProducerConsumer config, List<RecordBatch> children) throws ExecutionSetupException { return new ProducerConsumerBatch(config, context, Iterables.getOnlyElement(children)); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java index f249540..73ab441 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.ComplexToJson; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -29,15 +29,12 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class); - @Override - public ProjectRecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) + public ProjectRecordBatch getBatch(ExecutorFragmentContext context, ComplexToJson flatten, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new ProjectRecordBatch(new Project(null, flatten.getChild()), children.iterator().next(), context); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java index e7a6b05..37753cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -28,13 +28,10 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class ProjectBatchCreator implements BatchCreator<Project>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectBatchCreator.class); - @Override - public ProjectRecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children) + public ProjectRecordBatch getBatch(ExecutorFragmentContext context, Project config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new ProjectRecordBatch(config, children.iterator().next(), context); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index be0f61f..89e0ee9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -804,7 +804,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } catch (SchemaChangeException e) { kill(false); logger.error("Failure during query", e); - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index d711592..f38b62e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -132,7 +132,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return IterOutcome.NONE; } - builder.build(context, container); + builder.build(container); sorter = createNewSorter(); sorter.setup(context, getSelectionVector4(), this.container); sorter.sort(getSelectionVector4(), this.container); @@ -142,7 +142,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java index 559558f..ccd5561 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.sort; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -28,14 +28,10 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class SortBatchCreator implements BatchCreator<Sort>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class); - @Override - public SortBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) + public SortBatch getBatch(ExecutorFragmentContext context, Sort config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new SortBatch(config, context, children.iterator().next()); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 6b3de25..6c66c01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -27,7 +27,6 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.AllocationReservation; import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; @@ -136,10 +135,6 @@ public class SortRecordBatchBuilder implements AutoCloseable { return batches.isEmpty(); } - public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException { - build(outputContainer); - } - @SuppressWarnings("resource") public void build(VectorContainer outputContainer) throws SchemaChangeException { outputContainer.clear(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 4304c2c..66fe261 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -24,7 +24,6 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.record.AbstractSingleRecordBatch; @@ -235,10 +234,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private Copier getGenerated4Copier() throws SchemaChangeException { Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this, callBack); + return getGenerated4Copier(incoming, context, container, this, callBack); } - public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{ + public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, VectorContainer container, RecordBatch outgoing, + SchemaChangeCallBack callBack) throws SchemaChangeException{ for(VectorWrapper<?> vv : batch){ @SuppressWarnings("resource") http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java index 9ab39a3..4bf5b5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.svremover; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SVRemoverCreator.class); - +public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover> { @Override - public RemovingRecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) + public RemovingRecordBatch getBatch(ExecutorFragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new RemovingRecordBatch(config, context, children.iterator().next()); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java index 40ef2bb..dd2f6db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java @@ -21,19 +21,15 @@ package org.apache.drill.exec.physical.impl.trace; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; public class TraceBatchCreator implements BatchCreator<Trace> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class); - @Override - public TraceRecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) + public TraceRecordBatch getBatch(ExecutorFragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException { - // Preconditions.checkArgument(children.size() == 1); return new TraceRecordBatch(config, children.iterator().next(), context); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java index 1ef3142..bdc1a3d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.union; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class UnionAllBatchCreator implements BatchCreator<UnionAll>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class); - +public class UnionAllBatchCreator implements BatchCreator<UnionAll> { @Override - public UnionAllRecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children) + public UnionAllRecordBatch getBatch(ExecutorFragmentContext context, UnionAll config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() >= 1); return new UnionAllRecordBatch(config, children, context); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 1d1ecb0..b4d0e77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -129,7 +129,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { } } } catch (ClassTransformationException | IOException | SchemaChangeException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); killIncoming(false); return IterOutcome.STOP; } @@ -168,8 +168,6 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); int index = 0; for(VectorWrapper<?> vw : inputBatch) { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index cfdc06d..9da8a4b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -25,6 +25,7 @@ import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; @@ -57,7 +58,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { private final RecordBatchLoader batchLoader; private final RawFragmentBatchProvider fragProvider; - private final FragmentContext context; + private final ExchangeFragmentContext context; private BatchSchema schema; private final OperatorStats stats; private boolean first = true; @@ -74,7 +75,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { } } - public UnorderedReceiverBatch(final FragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException { + public UnorderedReceiverBatch(final ExchangeFragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException { this.fragProvider = fragProvider; this.context = context; // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, @@ -171,13 +172,13 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { if (batch == null) { batchLoader.clear(); - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } return IterOutcome.NONE; } - if (context.isOverMemoryLimit()) { + if (context.getAllocator().isOverLimit()) { return IterOutcome.OUT_OF_MEMORY; } @@ -197,7 +198,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { return IterOutcome.OK; } } catch(SchemaChangeException | IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } finally { stats.stopProcessing(); @@ -233,7 +234,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { .setReceiver(context.getHandle()) .setSender(sender) .build(); - context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver); + context.getController() + .getTunnel(providingEndpoint.getEndpoint()) + .informReceiverFinished(new OutcomeListener(), finishedReceiver); } } @@ -252,12 +255,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { @Override public void interrupted(final InterruptedException e) { - if (context.shouldContinue()) { + if (context.getExecutorState().shouldContinue()) { final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message"; logger.error(errMsg, e); - context.fail(new RpcException(errMsg, e)); + context.getExecutorState().fail(new RpcException(errMsg, e)); } } } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index 6d4f1d7..01a4588 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -20,18 +20,18 @@ package org.apache.drill.exec.physical.impl.unorderedreceiver; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.work.batch.IncomingBuffers; import org.apache.drill.exec.work.batch.RawBatchBuffer; -public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>{ +public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> { @SuppressWarnings("resource") @Override - public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) + public UnorderedReceiverBatch getBatch(ExecutorFragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); IncomingBuffers bufHolder = context.getBuffers(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java index 4199191..e27f881 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -32,8 +32,8 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator> static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class); @Override - public IteratorValidatorBatchIterator getBatch(FragmentContext context, IteratorValidator config, - List<RecordBatch> children) + public IteratorValidatorBatchIterator getBatch(ExecutorFragmentContext context, IteratorValidator config, + List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); RecordBatch child = children.iterator().next(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java index a8eddbc..c2bcab0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -30,11 +30,9 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.easy.json.JSONRecordReader; -import com.google.common.collect.Iterators; - public class ValuesBatchCreator implements BatchCreator<Values> { @Override - public ScanBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, Values config, List<RecordBatch> children) throws ExecutionSetupException { assert children.isEmpty(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java index 59bc115..6ca9652 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java @@ -21,7 +21,7 @@ package org.apache.drill.exec.physical.impl.window; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,9 +29,8 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class WindowFrameBatchCreator implements BatchCreator<WindowPOP> { - @Override - public WindowFrameRecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) + public WindowFrameRecordBatch getBatch(ExecutorFragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new WindowFrameRecordBatch(config, context, children.iterator().next());